Commit b9df560c authored by legoc's avatar legoc
Browse files

(split) All string message constants are now grouped in message string definitions

parent 34474216
......@@ -46,7 +46,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
string response(static_cast<char*>(message->data()), message->size());
if (response == "STATUS") {
if (response == message::Event::STATUS) {
message = m_impl->receive();
......@@ -61,7 +61,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates));
}
else if (response == "RESULT") {
else if (response == message::Event::RESULT) {
message = m_impl->receive();
......@@ -78,7 +78,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return unique_ptr<Event>(new ResultEvent(id, name, data));
}
else if (response == "PUBLISHER") {
else if (response == message::Event::PUBLISHER) {
message = m_impl->receive();
......@@ -92,7 +92,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return unique_ptr<Event>(new PublisherEvent(id, name, publisherName));
}
else if (response == "PORT") {
else if (response == message::Event::PORT) {
message = m_impl->receive();
......@@ -106,7 +106,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return unique_ptr<Event>(new PortEvent(id, name, portName));
}
else if (response == "CANCEL") {
else if (response == message::Event::CANCEL) {
message = m_impl->receive();
......@@ -124,7 +124,7 @@ void EventStreamSocket::cancel() {
WaitingImpl * EventStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
}
}
......@@ -58,13 +58,13 @@ bool OutputStreamSocket::receive(Output& output) {
string response(message->data<char>(), message->size());
if (response == ServicesImpl::STREAM) {
if (response == message::Event::STREAM) {
}
else if (response == ServicesImpl::ENDSTREAM) {
else if (response == message::Event::ENDSTREAM) {
m_ended = true;
return false;
}
else if (response == ServicesImpl::CANCEL) {
else if (response == message::Event::CANCEL) {
m_canceled = true;
return false;
}
......@@ -100,7 +100,7 @@ bool OutputStreamSocket::isCanceled() const {
WaitingImpl * OutputStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
}
}
......@@ -27,10 +27,6 @@ using namespace std;
namespace cameo {
const std::string PublisherImpl::SYNC = "SYNC";
const std::string PublisherImpl::STREAM = "STREAM";
const std::string PublisherImpl::ENDSTREAM = "ENDSTREAM";
PublisherImpl::PublisherImpl(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
m_application(application),
m_publisherPort(publisherPort),
......@@ -147,7 +143,7 @@ WaitingImpl * PublisherImpl::waiting() {
void PublisherImpl::sendBinary(const std::string& data) {
// send a STREAM message by the publisher socket
publish(STREAM, data.c_str(), data.length());
publish(message::Event::STREAM, data.c_str(), data.length());
}
void PublisherImpl::send(const std::string& data) {
......@@ -157,21 +153,21 @@ void PublisherImpl::send(const std::string& data) {
serialize(data, result);
// send a STREAM message by the publisher socket
publish(STREAM, result.c_str(), result.length());
publish(message::Event::STREAM, result.c_str(), result.length());
}
void PublisherImpl::sendTwoBinaryParts(const std::string& data1, const std::string& data2) {
// send a STREAM message by the publisher socket
publishTwoParts(STREAM, data1.c_str(), data1.length(), data2.c_str(), data2.length());
publishTwoParts(message::Event::STREAM, data1.c_str(), data1.length(), data2.c_str(), data2.length());
}
void PublisherImpl::setEnd() {
if (!m_ended && m_publisher.get() != nullptr) {
// send a dummy ENDSTREAM message by the publisher socket
string data = "endstream";
publish(ENDSTREAM, data.c_str(), data.length());
string data(message::Event::ENDSTREAM);
publish(message::Event::ENDSTREAM, data.c_str(), data.length());
m_ended = true;
}
......@@ -197,10 +193,10 @@ void PublisherImpl::terminate() {
void PublisherImpl::publish(const std::string& header, const char* data, std::size_t size) {
zmq::message_t requestType(header.length());
memcpy((void *) requestType.data(), header.c_str(), header.length());
memcpy(requestType.data(), header.c_str(), header.length());
zmq::message_t requestData(size);
memcpy((void *) requestData.data(), data, size);
memcpy(requestData.data(), data, size);
m_publisher->send(requestType, ZMQ_SNDMORE);
m_publisher->send(requestData);
......@@ -209,13 +205,13 @@ void PublisherImpl::publish(const std::string& header, const char* data, std::si
void PublisherImpl::publishTwoParts(const std::string& header, const char* data1, std::size_t size1, const char* data2, std::size_t size2) {
zmq::message_t requestType(header.length());
memcpy((void *) requestType.data(), header.c_str(), header.length());
memcpy(requestType.data(), header.c_str(), header.length());
zmq::message_t requestData1(size1);
memcpy((void *) requestData1.data(), data1, size1);
memcpy(requestData1.data(), data1, size1);
zmq::message_t requestData2(size2);
memcpy((void *) requestData2.data(), data2, size2);
memcpy(requestData2.data(), data2, size2);
m_publisher->send(requestType, ZMQ_SNDMORE);
m_publisher->send(requestData1, ZMQ_SNDMORE);
......@@ -225,8 +221,8 @@ void PublisherImpl::publishTwoParts(const std::string& header, const char* data1
zmq::message_t * PublisherImpl::processInitCommand() {
// send a dummy SYNC message by the publisher socket
string data = "sync";
publish(SYNC, data.c_str(), data.length());
string data(message::Event::SYNC);
publish(message::Event::SYNC, data.c_str(), data.length());
data = "Connection OK";
size_t size = data.length();
......@@ -241,7 +237,7 @@ zmq::message_t * PublisherImpl::processSubscribePublisherCommand() {
string result = m_application->m_impl->createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy((void *) reply->data(), result.c_str(), result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
......@@ -251,7 +247,7 @@ zmq::message_t * PublisherImpl::processCancelPublisherSyncCommand() {
string result = m_application->m_impl->createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy((void *) reply->data(), result.c_str(), result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
......
......@@ -64,10 +64,6 @@ public:
int m_numberOfSubscribers;
std::unique_ptr<zmq::socket_t> m_publisher;
bool m_ended;
static const std::string SYNC;
static const std::string STREAM;
static const std::string ENDSTREAM;
};
}
......
......@@ -38,14 +38,6 @@ using namespace std;
namespace cameo {
const std::string ServicesImpl::STATUS = "STATUS";
const std::string ServicesImpl::RESULT = "RESULT";
const std::string ServicesImpl::PUBLISHER = "PUBLISHER";
const std::string ServicesImpl::PORT = "PORT";
const std::string ServicesImpl::CANCEL = "CANCEL";
const std::string ServicesImpl::STREAM = "STREAM";
const std::string ServicesImpl::ENDSTREAM = "ENDSTREAM";
ServicesImpl::ServicesImpl() :
m_context(1), m_timeout(0) {
}
......@@ -397,11 +389,11 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint,
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList;
streamList.push_back(STATUS);
streamList.push_back(RESULT);
streamList.push_back(PUBLISHER);
streamList.push_back(PORT);
streamList.push_back(CANCEL);
streamList.push_back(message::Event::STATUS);
streamList.push_back(message::Event::RESULT);
streamList.push_back(message::Event::PUBLISHER);
streamList.push_back(message::Event::PORT);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
......@@ -418,9 +410,9 @@ zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& en
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList;
streamList.push_back(STREAM);
streamList.push_back(ENDSTREAM);
streamList.push_back(CANCEL);
streamList.push_back(message::Event::STREAM);
streamList.push_back(message::Event::ENDSTREAM);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
......
......@@ -71,14 +71,6 @@ public:
zmq::context_t m_context;
int m_timeout;
static const std::string STATUS;
static const std::string RESULT;
static const std::string PUBLISHER;
static const std::string PORT;
static const std::string CANCEL;
static const std::string STREAM;
static const std::string ENDSTREAM;
};
}
......
......@@ -15,10 +15,10 @@
*/
#include "SocketWaitingImpl.h"
#include <iostream>
#include "../Application.h"
#include "../message/Message.h"
#include "WaitingImplSet.h"
#include <iostream>
using namespace std;
......@@ -46,10 +46,10 @@ SocketWaitingImpl::~SocketWaitingImpl() {
void SocketWaitingImpl::cancel() {
zmq::message_t requestType(m_message.length());
string data("CANCEL");
string data(message::Event::CANCEL);
zmq::message_t requestData(data.length());
memcpy((void *) requestType.data(), m_message.c_str(), m_message.length());
memcpy((void *) requestData.data(), data.c_str(), data.length());
memcpy(requestType.data(), m_message.c_str(), m_message.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_socket->send(requestType, ZMQ_SNDMORE);
m_socket->send(requestData);
}
......
......@@ -15,15 +15,13 @@
*/
#include "StreamSocketImpl.h"
#include "../message/Message.h"
#include <iostream>
using namespace std;
namespace cameo {
const std::string StreamSocketImpl::CANCEL = "CANCEL";
StreamSocketImpl::StreamSocketImpl(zmq::socket_t * socket, zmq::socket_t * cancelSocket) :
m_socket(socket), m_cancelSocket(cancelSocket) {
}
......@@ -52,12 +50,13 @@ std::unique_ptr<zmq::message_t> StreamSocketImpl::receive(bool blocking) {
}
void StreamSocketImpl::cancel() {
if (m_cancelSocket.get() != nullptr) {
zmq::message_t requestType(CANCEL.length());
string data("cancel");
string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy((void *) requestType.data(), CANCEL.c_str(), CANCEL.length());
memcpy((void *) requestData.data(), data.c_str(), data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, ZMQ_SNDMORE);
m_cancelSocket->send(requestData);
}
......
......@@ -36,8 +36,6 @@ public:
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
static const std::string CANCEL;
};
}
......
......@@ -28,12 +28,6 @@ using namespace std;
namespace cameo {
const std::string SubscriberImpl::SYNC = "SYNC";
const std::string SubscriberImpl::STREAM = "STREAM";
const std::string SubscriberImpl::ENDSTREAM = "ENDSTREAM";
const std::string SubscriberImpl::CANCEL = "CANCEL";
const std::string SubscriberImpl::STATUS = "STATUS";
SubscriberImpl::SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_server(server),
m_url(url),
......@@ -56,11 +50,11 @@ void SubscriberImpl::init() {
// Create a socket for publishing.
m_subscriber.reset(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_SUB));
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, SYNC.c_str(), SYNC.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, STREAM.c_str(), STREAM.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, ENDSTREAM.c_str(), ENDSTREAM.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, CANCEL.c_str(), CANCEL.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, STATUS.c_str(), STATUS.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, string(message::Event::SYNC).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, string(message::Event::STREAM).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM, string(message::Event::ENDSTREAM).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, string(message::Event::STATUS).length());
stringstream pubEndpoint;
pubEndpoint << m_url << ":" << m_publisherPort;
......@@ -128,22 +122,22 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
string response(static_cast<char*>(message->data()), message->size());
if (response == STREAM) {
if (response == message::Event::STREAM) {
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
data = string(static_cast<char*>(message->data()), message->size());
return true;
} else if (response == ENDSTREAM) {
} else if (response == message::Event::ENDSTREAM) {
m_ended = true;
return false;
} else if (response == CANCEL) {
} else if (response == message::Event::CANCEL) {
m_canceled = true;
return false;
} else if (response == STATUS) {
} else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
......@@ -193,7 +187,7 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
string response(static_cast<char*>(message->data()), message->size());
if (response == STREAM) {
if (response == message::Event::STREAM) {
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
data1 = string(static_cast<char*>(message->data()), message->size());
......@@ -204,14 +198,14 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
return true;
} else if (response == ENDSTREAM) {
} else if (response == message::Event::ENDSTREAM) {
m_ended = true;
return false;
} else if (response == CANCEL) {
} else if (response == message::Event::CANCEL) {
return false;
} else if (response == STATUS) {
} else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t());
m_subscriber->recv(message.get());
......@@ -242,7 +236,7 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
WaitingImpl * SubscriberImpl::waiting() {
// Waiting gets the cancel publisher.
return new SocketWaitingImpl(m_cancelPublisher.get(), CANCEL);
return new SocketWaitingImpl(m_cancelPublisher.get(), message::Event::CANCEL);
}
}
......@@ -58,12 +58,6 @@ public:
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
bool m_ended;
bool m_canceled;
static const std::string SYNC;
static const std::string STREAM;
static const std::string ENDSTREAM;
static const std::string CANCEL;
static const std::string STATUS;
};
}
......
......@@ -54,6 +54,17 @@ namespace message {
const int STARTED_UNMANAGED = 27;
const int TERMINATED_UNMANAGED = 28;
namespace Event {
constexpr const char* CANCEL = "CANCEL";
constexpr const char* STREAM = "STREAM";
constexpr const char* ENDSTREAM = "ENDSTREAM";
constexpr const char* SYNC = "SYNC";
constexpr const char* STATUS = "STATUS";
constexpr const char* RESULT = "RESULT";
constexpr const char* PORT = "PORT";
constexpr const char* PUBLISHER = "PUBLISHER";
}
namespace StartRequest {
constexpr const char* NAME = "name"; // required string name = 1;
constexpr const char* ARGS = "args"; // repeated string args = 2;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment