Commit 116b3cfc authored by legoc's avatar legoc
Browse files

Use of requestJSON()

parent 35574d52
......@@ -135,8 +135,6 @@ public:
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint) const;
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout) const;
std::string createJSONResponse(int value, const std::string& message) const;
void removePort(const std::string& name) const;
private:
......@@ -254,8 +252,6 @@ public:
std::string getKeyValue(const std::string& key) const;
json::Object requestJSON(const std::string& request, int overrideTimeout = -1) const;
std::string createJSONResponse(int value, const std::string& message) const;
private:
Com(Server* server);
......
......@@ -100,19 +100,6 @@ std::unique_ptr<RequestSocketImpl> This::Com::createRequestSocket(const std::str
return m_server->createRequestSocket(endpoint, timeout);
}
std::string This::Com::createJSONResponse(int value, const std::string& message) const {
json::StringObject request;
request.pushKey(message::RequestResponse::VALUE);
request.pushInt64(value);
request.pushKey(message::RequestResponse::MESSAGE);
request.pushString(message);
return request.toString();
}
void This::Com::removePort(const std::string& name) const {
json::Object response = m_server->requestJSON(createRemovePortV0Request(m_applicationId, name));
......@@ -458,19 +445,6 @@ json::Object Instance::Com::requestJSON(const std::string& request, int override
return m_server->requestJSON(request, overrideTimeout);
}
std::string Instance::Com::createJSONResponse(int value, const std::string& message) const {
json::StringObject request;
request.pushKey(message::RequestResponse::VALUE);
request.pushInt64(value);
request.pushKey(message::RequestResponse::MESSAGE);
request.pushString(message);
return request.toString();
}
Instance::Instance(Server * server) :
m_server(server),
m_id(-1),
......
......@@ -108,8 +108,7 @@ bool PublisherImpl::waitForSubscribers() {
reply.reset(processCancelPublisherSyncCommand());
}
else {
std::cerr << "Unknown message type " << type << std::endl;
synchronizer.send(*message);
reply.reset(processUnknownCommand());
}
// send to the client
......@@ -129,7 +128,7 @@ void PublisherImpl::cancelWaitForSubscribers() {
// Create a request socket only for the request.
std::unique_ptr<RequestSocketImpl> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_synchronizerPort).toString());
requestSocket->request(request.toString());
requestSocket->requestJSON(request.toString());
}
WaitingImpl * PublisherImpl::waiting() {
......@@ -225,10 +224,11 @@ zmq::message_t * PublisherImpl::processInitCommand() {
std::string data(message::Event::SYNC);
publish(message::Event::SYNC, data.c_str(), data.length());
data = "Connection OK";
size_t size = data.length();
std::string result = createRequestResponse(0, "OK");
size_t size = result.length();
zmq::message_t * reply = new zmq::message_t(size);
memcpy((void *) reply->data(), data.c_str(), size);
memcpy((void *) reply->data(), result.c_str(), size);
return reply;
}
......@@ -253,6 +253,16 @@ zmq::message_t * PublisherImpl::processCancelPublisherSyncCommand() {
return reply;
}
zmq::message_t * PublisherImpl::processUnknownCommand() {
std::string result = createRequestResponse(-1, "Unknown command");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
std::string PublisherImpl::createTerminatePublisherRequest(int id, const std::string& name) const {
json::StringObject request;
......
......@@ -59,6 +59,7 @@ public:
zmq::message_t * processInitCommand();
zmq::message_t * processSubscribePublisherCommand();
zmq::message_t * processCancelPublisherSyncCommand();
zmq::message_t * processUnknownCommand();
std::string createTerminatePublisherRequest(int id, const std::string& name) const;
......
......@@ -160,7 +160,7 @@ std::optional<std::string> RequesterImpl::receiveBinary() {
}
// Create the reply.
std::string data = "OK";
std::string data = createRequestResponse(0, "OK");
size_t size = data.length();
std::unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy(reply->data(), data.c_str(), size);
......@@ -182,7 +182,7 @@ void RequesterImpl::cancel() {
// Create a request socket only for the request.
std::unique_ptr<RequestSocketImpl> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_requesterPort).toString());
requestSocket->request(request.toString());
requestSocket->requestJSON(request.toString());
}
void RequesterImpl::terminate() {
......
......@@ -56,7 +56,7 @@ void ResponderImpl::cancel() {
// Create a request socket.
std::unique_ptr<RequestSocketImpl> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_responderPort).toString());
requestSocket->request(request.toString());
requestSocket->requestJSON(request.toString());
}
WaitingImpl * ResponderImpl::waiting() {
......@@ -75,11 +75,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
int type = request[message::TYPE].GetInt();
// Create the reply
std::string data = "OK";
size_t size = data.length();
std::unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy(reply->data(), data.c_str(), size);
std::unique_ptr<zmq::message_t> reply;
std::unique_ptr<RequestImpl> result;
if (type == message::REQUEST) {
......@@ -109,13 +105,14 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
m_responder->recv(message.get(), 0);
result->m_message2 = std::string(message->data<char>(), message->size());
}
reply.reset(processRequest());
}
else if (type == message::CANCEL) {
m_canceled = true;
reply.reset(processCancelResponder());
}
else {
std::cerr << "Unknown message type " << type << std::endl;
m_responder->send(*message);
reply.reset(processUnknownRequest());
}
// send to the client
......@@ -126,6 +123,36 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
return result;
}
zmq::message_t * ResponderImpl::processRequest() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderImpl::processCancelResponder() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderImpl::processUnknownRequest() {
std::string result = createRequestResponse(-1, "Unknown request");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
void ResponderImpl::terminate() {
if (m_responder.get() != nullptr) {
......
......@@ -39,6 +39,11 @@ public:
WaitingImpl * waiting();
std::unique_ptr<RequestImpl> receive();
zmq::message_t * processRequest();
zmq::message_t * processCancelResponder();
zmq::message_t * processUnknownRequest();
void terminate();
int m_responderPort;
......
......@@ -93,7 +93,7 @@ void SubscriberImpl::init() {
}
}
requestSocket->request(createSubscribePublisherRequest());
requestSocket->requestJSON(createSubscribePublisherRequest());
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment