Commit e6c49dcd authored by legoc's avatar legoc
Browse files

Responder reviewed

parent e31a46e7
......@@ -22,7 +22,6 @@ namespace cameo {
namespace coms {
class Responder;
class RequestImpl;
class ResponderImpl;
class RequesterImpl;
......@@ -100,6 +99,7 @@ public:
private:
Responder(int responderPort, const std::string &name);
std::string m_name;
std::unique_ptr<ResponderImpl> m_impl;
std::unique_ptr<Waiting> m_waiting;
};
......
......@@ -22,7 +22,7 @@
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "impl/RequesterImpl.h"
#include "impl/ResponderImpl.h"
#include "impl/zmq/ResponderZmq.h"
namespace cameo {
namespace coms {
......@@ -140,13 +140,18 @@ std::unique_ptr<Server> Request::getServer() {
// Responder
Responder::Responder(int responderPort, const std::string& name) :
m_impl(new ResponderImpl(responderPort, name)) {
m_name(name) {
//TODO Replace with a factory.
m_impl = std::unique_ptr<ResponderImpl>(new ResponderZmq());
m_impl->init(responderPort);
// Create the waiting here.
m_waiting.reset(m_impl->waiting());
m_waiting.reset(new Waiting(std::bind(&Responder::cancel, this)));
}
Responder::~Responder() {
application::This::getCom().removePort(ResponderImpl::RESPONDER_PREFIX + m_name);
}
std::unique_ptr<Responder> Responder::create(const std::string& name) {
......@@ -163,7 +168,7 @@ std::unique_ptr<Responder> Responder::create(const std::string& name) {
}
const std::string& Responder::getName() const {
return m_impl->m_name;
return m_name;
}
void Responder::cancel() {
......@@ -175,7 +180,7 @@ std::unique_ptr<Request> Responder::receive() {
}
bool Responder::isCanceled() const {
return m_impl->m_canceled;
return m_impl->isCanceled();
}
///////////////////////////////////////////////////////////////////////////
......@@ -194,7 +199,7 @@ Requester::~Requester() {
std::unique_ptr<Requester> Requester::create(application::Instance & instance, const std::string& name) {
int responderId = instance.getId();
std::string responderPortName = ResponderImpl::RESPONDER_PREFIX + name;
std::string responderPortName = ResponderZmq::RESPONDER_PREFIX + name;
int requesterId = RequesterImpl::newRequesterId();
std::string requesterPortName = RequesterImpl::getRequesterPortName(name, responderId, requesterId);
......
......@@ -16,155 +16,11 @@
#include "ResponderImpl.h"
#include "RequesterResponder.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
namespace cameo {
namespace coms {
const std::string ResponderImpl::RESPONDER_PREFIX = "rep.";
ResponderImpl::ResponderImpl(int responderPort, const std::string& name) :
m_responderPort(responderPort),
m_name(name),
m_canceled(false) {
// create a socket REP
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_responder.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream repEndpoint;
repEndpoint << "tcp://*:" << m_responderPort;
m_responder->bind(repEndpoint.str().c_str());
}
ResponderImpl::~ResponderImpl() {
terminate();
}
void ResponderImpl::cancel() {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_responderPort).toString());
requestSocket->requestJSON(request.toString());
}
Waiting * ResponderImpl::waiting() {
return new Waiting(std::bind(&ResponderImpl::cancel, this));
}
std::unique_ptr<Request> ResponderImpl::receive() {
std::unique_ptr<zmq::message_t> message(new zmq::message_t);
m_responder->recv(message.get(), 0);
// Get the JSON request.
json::Object request;
json::parse(request, message.get());
int type = request[message::TYPE].GetInt();
// Create the reply
std::unique_ptr<zmq::message_t> reply;
std::unique_ptr<Request> result;
if (type == message::REQUEST) {
std::string name = request[message::Request::APPLICATION_NAME].GetString();
int id = request[message::Request::APPLICATION_ID].GetInt();
std::string serverUrl = request[message::Request::SERVER_URL].GetString();
int serverPort = request[message::Request::SERVER_PORT].GetInt();
int requesterPort = request[message::Request::REQUESTER_PORT].GetInt();
// Get the second part for the message.
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
std::string message1(message->data<char>(), message->size());
std::string message2;
// Set message 2 if it exists.
if (message->more()) {
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
message2 = std::string(message->data<char>(), message->size());
}
// Create the request.
result = std::unique_ptr<Request>(new Request(name,
id,
serverUrl,
serverPort,
requesterPort,
message1,
message2));
reply.reset(responseToRequest());
}
else if (type == message::CANCEL) {
m_canceled = true;
reply.reset(responseToCancelResponder());
}
else {
reply.reset(responseToUnknownRequest());
}
// send to the client
if (reply != nullptr) {
m_responder->send(*reply);
}
return result;
}
zmq::message_t * ResponderImpl::responseToRequest() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderImpl::responseToCancelResponder() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderImpl::responseToUnknownRequest() {
std::string result = createRequestResponse(-1, "Unknown request");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
void ResponderImpl::terminate() {
if (m_responder.get() != nullptr) {
m_responder.reset(nullptr);
application::This::getCom().removePort(RESPONDER_PREFIX + m_name);
}
}
}
}
......@@ -17,10 +17,7 @@
#ifndef CAMEO_RESPONDERIMPL_H_
#define CAMEO_RESPONDERIMPL_H_
#include "../../base/Waiting.h"
#include "zmq.hpp"
#include <string>
#include <vector>
#include <memory>
namespace cameo {
......@@ -31,24 +28,13 @@ class Request;
class ResponderImpl {
public:
ResponderImpl(int responderPort, const std::string& name);
~ResponderImpl();
virtual ~ResponderImpl() {}
void cancel();
Waiting * waiting();
virtual void init(int responderPort) = 0;
virtual void cancel() = 0;
virtual bool isCanceled() = 0;
std::unique_ptr<Request> receive();
zmq::message_t * responseToRequest();
zmq::message_t * responseToCancelResponder();
zmq::message_t * responseToUnknownRequest();
void terminate();
int m_responderPort;
std::string m_name;
std::unique_ptr<zmq::socket_t> m_responder;
bool m_canceled;
virtual std::unique_ptr<Request> receive() = 0;
static const std::string RESPONDER_PREFIX;
};
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "ResponderZmq.h"
#include "RequesterResponder.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../../base/impl/zmq/ContextZmq.h"
#include "../../../base/Messages.h"
#include "../../../base/RequestSocket.h"
#include <sstream>
namespace cameo {
namespace coms {
ResponderZmq::ResponderZmq() :
m_responderPort(0),
m_canceled(false) {
}
void ResponderZmq::init(int responderPort) {
m_responderPort = responderPort;
// create a socket REP
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_responder.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream repEndpoint;
repEndpoint << "tcp://*:" << m_responderPort;
m_responder->bind(repEndpoint.str().c_str());
}
ResponderZmq::~ResponderZmq() {
terminate();
}
void ResponderZmq::cancel() {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_responderPort).toString());
requestSocket->requestJSON(request.toString());
}
bool ResponderZmq::isCanceled() {
return m_canceled;
}
std::unique_ptr<Request> ResponderZmq::receive() {
std::unique_ptr<zmq::message_t> message(new zmq::message_t);
m_responder->recv(message.get(), 0);
// Get the JSON request.
json::Object request;
json::parse(request, message.get());
int type = request[message::TYPE].GetInt();
// Create the reply
std::unique_ptr<zmq::message_t> reply;
std::unique_ptr<Request> result;
if (type == message::REQUEST) {
std::string name = request[message::Request::APPLICATION_NAME].GetString();
int id = request[message::Request::APPLICATION_ID].GetInt();
std::string serverUrl = request[message::Request::SERVER_URL].GetString();
int serverPort = request[message::Request::SERVER_PORT].GetInt();
int requesterPort = request[message::Request::REQUESTER_PORT].GetInt();
// Get the second part for the message.
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
std::string message1(message->data<char>(), message->size());
std::string message2;
// Set message 2 if it exists.
if (message->more()) {
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
message2 = std::string(message->data<char>(), message->size());
}
// Create the request.
result = std::unique_ptr<Request>(new Request(name,
id,
serverUrl,
serverPort,
requesterPort,
message1,
message2));
reply.reset(responseToRequest());
}
else if (type == message::CANCEL) {
m_canceled = true;
reply.reset(responseToCancelResponder());
}
else {
reply.reset(responseToUnknownRequest());
}
// send to the client
if (reply != nullptr) {
m_responder->send(*reply);
}
return result;
}
zmq::message_t * ResponderZmq::responseToRequest() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderZmq::responseToCancelResponder() {
std::string result = createRequestResponse(0, "OK");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
zmq::message_t * ResponderZmq::responseToUnknownRequest() {
std::string result = createRequestResponse(-1, "Unknown request");
zmq::message_t * reply = new zmq::message_t(result.length());
memcpy(reply->data(), result.c_str(), result.length());
return reply;
}
void ResponderZmq::terminate() {
if (m_responder.get() != nullptr) {
m_responder.reset(nullptr);
}
}
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_RESPONDERZMQ_H_
#define CAMEO_RESPONDERZMQ_H_
#include "../ResponderImpl.h"
#include <zmq.hpp>
namespace cameo {
namespace coms {
class Request;
class ResponderZmq : public ResponderImpl {
public:
ResponderZmq();
~ResponderZmq();
void init(int responderPort);
void cancel();
bool isCanceled();
std::unique_ptr<Request> receive();
private:
zmq::message_t * responseToRequest();
zmq::message_t * responseToCancelResponder();
zmq::message_t * responseToUnknownRequest();
void terminate();
int m_responderPort;
std::unique_ptr<zmq::socket_t> m_responder;
bool m_canceled;
};
}
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment