Commit 07c072bf authored by legoc's avatar legoc
Browse files

Removed all the delete calls, using unique_ptr instead (Corrects the bug of a...

Removed all the delete calls, using unique_ptr instead (Corrects the bug of a forgotten delete in RequesterImpl)
parent 3b5be68e
......@@ -36,7 +36,7 @@ EventStreamSocket::~EventStreamSocket() {
std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
zmq::message_t * message = m_impl->receive(blocking);
unique_ptr<zmq::message_t> message(m_impl->receive(blocking));
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
......@@ -44,7 +44,6 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
}
string response(static_cast<char*>(message->data()), message->size());
delete message;
if (response == "STATUS") {
......@@ -52,7 +51,6 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray(message->data(), message->size());
delete message;
return unique_ptr<Event>(new StatusEvent(protoStatus.id(), protoStatus.name(), protoStatus.applicationstate(), protoStatus.pastapplicationstates()));
......@@ -62,18 +60,15 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
proto::ResultEvent protoResult;
protoResult.ParseFromArray(message->data(), message->size());
delete message;
return unique_ptr<Event>(new ResultEvent(protoResult.id(), protoResult.name(), protoResult.data()));
} else if (response == "PUBLISHER") {
message = m_impl->receive();
proto::PublisherEvent protoPublisher;
protoPublisher.ParseFromArray(message->data(), message->size());
delete message;
return unique_ptr<Event>(new PublisherEvent(protoPublisher.id(), protoPublisher.name(), protoPublisher.publishername()));
......@@ -83,14 +78,12 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
proto::PortEvent protoPort;
protoPort.ParseFromArray(message->data(), message->size());
delete message;
return unique_ptr<Event>(new PortEvent(protoPort.id(), protoPort.name(), protoPort.portname()));
} else if (response == "CANCEL") {
message = m_impl->receive();
delete message;
// Exit with a null event.
return unique_ptr<Event>(nullptr);
......
......@@ -85,36 +85,35 @@ bool PublisherImpl::waitForSubscribers() {
while (counter < m_numberOfSubscribers) {
zmq::message_t * message = new zmq::message_t;
synchronizer.recv(message, 0);
unique_ptr<zmq::message_t> message(new zmq::message_t);
synchronizer.recv(message.get(), 0);
// multi-part message, first part is the type
proto::MessageType messageType;
messageType.ParseFromArray((*message).data(), (*message).size());
if (message->more()) {
delete message;
message = new zmq::message_t;
synchronizer.recv(message, 0);
message.reset(new zmq::message_t);
synchronizer.recv(message.get(), 0);
} else {
cerr << "unexpected number of frames, should be 2" << endl;
continue;
}
zmq::message_t * reply = nullptr;
unique_ptr<zmq::message_t> reply;
if (messageType.type() == proto::MessageType_Type_INIT) {
reply = processInitCommand();
reply.reset(processInitCommand());
} else if (messageType.type() == proto::MessageType_Type_SUBSCRIBEPUBLISHER) {
counter++;
reply = processSubscribePublisherCommand();
reply.reset(processSubscribePublisherCommand());
} else if (messageType.type() == proto::MessageType_Type_CANCEL) {
canceled = true;
counter = m_numberOfSubscribers;
reply = processCancelPublisherSyncCommand();
reply.reset(processCancelPublisherSyncCommand());
} else {
cerr << "unknown message type " << messageType.type() << endl;
......@@ -125,9 +124,6 @@ bool PublisherImpl::waitForSubscribers() {
if (reply != nullptr) {
synchronizer.send(*reply);
}
delete reply;
delete message;
}
return !canceled;
......
......@@ -122,8 +122,8 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
bool RequesterImpl::receiveBinary(std::string& response) {
zmq::message_t * message = new zmq::message_t;
m_requester->recv(message, 0);
unique_ptr<zmq::message_t> message(new zmq::message_t);
m_requester->recv(message.get(), 0);
// multi-part message, first part is the type
proto::MessageType messageType;
......@@ -132,9 +132,8 @@ bool RequesterImpl::receiveBinary(std::string& response) {
bool canceled = false;
if (message->more()) {
delete message;
message = new zmq::message_t;
m_requester->recv(message, 0);
message.reset(new zmq::message_t);
m_requester->recv(message.get(), 0);
} else {
cerr << "unexpected number of frames, should be 2" << endl;
......@@ -151,13 +150,11 @@ bool RequesterImpl::receiveBinary(std::string& response) {
// Create the reply
string data = "OK";
size_t size = data.length();
zmq::message_t * reply = new zmq::message_t(size);
unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy((void *) reply->data(), data.c_str(), size);
m_requester->send(*reply);
delete reply;
return !canceled;
}
......
......@@ -65,17 +65,16 @@ WaitingImpl * ResponderImpl::waiting() {
std::unique_ptr<RequestImpl> ResponderImpl::receive() {
zmq::message_t * message = new zmq::message_t;
m_responder->recv(message, 0);
unique_ptr<zmq::message_t> message(new zmq::message_t);
m_responder->recv(message.get(), 0);
// multi-part message, first part is the type
proto::MessageType messageType;
messageType.ParseFromArray((*message).data(), (*message).size());
if (message->more()) {
delete message;
message = new zmq::message_t;
m_responder->recv(message, 0);
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
} else {
cerr << "unexpected number of frames, should be 2" << endl;
......@@ -87,7 +86,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
// Create the reply
string data = "OK";
size_t size = data.length();
zmq::message_t * reply = new zmq::message_t(size);
unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy((void *) reply->data(), data.c_str(), size);
unique_ptr<RequestImpl> result;
......@@ -125,9 +124,6 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
m_responder->send(*reply);
}
delete reply;
delete message;
return result;
}
......
......@@ -39,19 +39,16 @@ void SocketImpl::send(const std::string& data) {
m_socket->send(messageData);
}
zmq::message_t * SocketImpl::receive(bool blocking) {
std::unique_ptr<zmq::message_t> SocketImpl::receive(bool blocking) {
// Use the message interface.
zmq::message_t * message = new zmq::message_t();
if (m_socket->recv(message, (blocking ? 0 : ZMQ_DONTWAIT))) {
unique_ptr<zmq::message_t> message(new zmq::message_t());
if (m_socket->recv(message.get(), (blocking ? 0 : ZMQ_DONTWAIT))) {
// The message exists.
return message;
}
// No message.
delete message;
return nullptr;
return unique_ptr<zmq::message_t>(nullptr);
}
void SocketImpl::cancel() {
......
......@@ -30,7 +30,7 @@ public:
virtual ~SocketImpl();
void send(const std::string& data);
zmq::message_t * receive(bool blocking = true);
std::unique_ptr<zmq::message_t> receive(bool blocking = true);
void cancel();
void close();
......
......@@ -119,17 +119,15 @@ bool SubscriberImpl::hasEnded() const {
bool SubscriberImpl::receiveBinary(std::string& data) {
while (true) {
zmq::message_t * message = new zmq::message_t();
m_subscriber->recv(message);
unique_ptr<zmq::message_t> message(new zmq::message_t());
m_subscriber->recv(message.get());
string response(static_cast<char*>(message->data()), message->size());
delete message;
if (response == STREAM) {
message = new zmq::message_t();
m_subscriber->recv(message);
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
data = string(static_cast<char*>(message->data()), message->size());
delete message;
return true;
......@@ -141,12 +139,11 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
return false;
} else if (response == STATUS) {
message = new zmq::message_t();
m_subscriber->recv(message);
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray(message->data(), message->size());
delete message;
if (protoStatus.id() == m_instanceId) {
application::State state = protoStatus.applicationstate();
......@@ -239,22 +236,19 @@ bool SubscriberImpl::receive(std::vector<double>& data) {
bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) {
while (true) {
zmq::message_t * message = new zmq::message_t();
m_subscriber->recv(message);
unique_ptr<zmq::message_t> message(new zmq::message_t());
m_subscriber->recv(message.get());
string response(static_cast<char*>(message->data()), message->size());
delete message;
if (response == STREAM) {
message = new zmq::message_t();
m_subscriber->recv(message);
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
data1 = string(static_cast<char*>(message->data()), message->size());
delete message;
message = new zmq::message_t();
m_subscriber->recv(message);
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
data2 = string(static_cast<char*>(message->data()), message->size());
delete message;
return true;
......@@ -266,12 +260,11 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
return false;
} else if (response == STATUS) {
message = new zmq::message_t();
m_subscriber->recv(message);
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
proto::StatusEvent protoStatus;
protoStatus.ParseFromArray(message->data(), message->size());
delete message;
if (protoStatus.id() == m_instanceId) {
application::State state = protoStatus.applicationstate();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment