Commit 706316fa authored by legoc's avatar legoc
Browse files

Reviewed Requester

parent cddf96d5
......@@ -142,10 +142,20 @@ public:
bool isCanceled() const;
private:
Requester(const Endpoint &endpoint, int requesterPort, int responderPort, const std::string &name, int responderId, int requesterId);
void init(application::Instance &instance, const std::string &name);
Requester(const std::string &name);
static int newRequesterId();
static std::string getRequesterPortName(const std::string& name, int responderId, int requesterId);
std::string m_name;
int m_requesterId;
int m_responderId;
std::unique_ptr<RequesterImpl> m_impl;
std::unique_ptr<Waiting> m_waiting;
static std::mutex m_mutex;
static int m_requesterCounter;
};
std::ostream& operator<<(std::ostream&, const cameo::coms::Request&);
......
......@@ -21,7 +21,7 @@
#include "../base/impl/zmq/ContextZmq.h"
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "impl/RequesterImpl.h"
#include "impl/zmq/RequesterZmq.h"
#include "impl/zmq/ResponderZmq.h"
namespace cameo {
......@@ -193,24 +193,50 @@ bool Responder::isCanceled() const {
///////////////////////////////////////////////////////////////////////////
// Requester
Requester::Requester(const Endpoint &endpoint, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
m_impl(new RequesterImpl(endpoint, requesterPort, responderPort, name, responderId, requesterId)) {
std::mutex Requester::m_mutex;
int Requester::m_requesterCounter = 0;
Requester::Requester(const std::string &name) :
m_name(name),
m_requesterId(0),
m_responderId(0) {
//TODO Replace with factory.
m_impl = std::unique_ptr<RequesterImpl>(new RequesterZmq());
// Create the waiting here.
m_waiting.reset(m_impl->waiting());
m_waiting.reset(new Waiting(std::bind(&Requester::cancel, this)));
}
Requester::~Requester() {
application::This::getCom().removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
}
std::unique_ptr<Requester> Requester::create(application::Instance & instance, const std::string& name) {
int Requester::newRequesterId() {
std::lock_guard<std::mutex> lock(m_mutex);
m_requesterCounter++;
int responderId = instance.getId();
return m_requesterCounter;
}
std::string Requester::getRequesterPortName(const std::string& name, int responderId, int requesterId) {
std::stringstream requesterPortName;
requesterPortName << RequesterImpl::REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;
return requesterPortName.str();
}
void Requester::init(application::Instance &instance, const std::string &name) {
m_responderId = instance.getId();
std::string responderPortName = ResponderZmq::RESPONDER_PREFIX + name;
int requesterId = RequesterImpl::newRequesterId();
std::string requesterPortName = RequesterImpl::getRequesterPortName(name, responderId, requesterId);
m_requesterId = newRequesterId();
std::string requesterPortName = getRequesterPortName(name, m_responderId, m_requesterId);
std::string request = createConnectPortV0Request(responderId, responderPortName);
std::string request = createConnectPortV0Request(m_responderId, responderPortName);
json::Object response = instance.getCom().requestJSON(request);
......@@ -237,11 +263,19 @@ std::unique_ptr<Requester> Requester::create(application::Instance & instance, c
}
// TODO simplify the use of some variables: responderUrl.
return std::unique_ptr<Requester>(new Requester(instance.getEndpoint(), requesterPort, responderPort, name, responderId, requesterId));
m_impl->init(instance.getEndpoint(), requesterPort, responderPort, name);
}
std::unique_ptr<Requester> Requester::create(application::Instance & instance, const std::string& name) {
std::unique_ptr<Requester> requester = std::unique_ptr<Requester>(new Requester(name));
requester->init(instance, name);
return requester;
}
const std::string& Requester::getName() const {
return m_impl->m_name;
return m_name;
}
void Requester::sendBinary(const std::string& request) {
......@@ -269,7 +303,7 @@ void Requester::cancel() {
}
bool Requester::isCanceled() const {
return m_impl->m_canceled;
return m_impl->isCanceled();
}
std::ostream& operator<<(std::ostream& os, const Request& request) {
......@@ -293,7 +327,7 @@ std::ostream& operator<<(std::ostream& os, const Responder& responder) {
std::ostream& operator<<(std::ostream& os, const Requester& requester) {
os << "req." << requester.getName()
<< "." << requester.m_impl->m_requesterId
<< "." << requester.m_requesterId
<< ":" << application::This::getName()
<< "." << application::This::getId()
<< "@" << application::This::getEndpoint();
......
......@@ -16,186 +16,10 @@
#include "RequesterImpl.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <zmq.hpp>
#include <sstream>
namespace cameo {
namespace coms {
const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std::mutex RequesterImpl::m_mutex;
int RequesterImpl::m_requesterCounter = 0;
RequesterImpl::RequesterImpl(const Endpoint& endpoint, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
m_requesterPort(requesterPort),
m_name(name),
m_responderId(responderId),
m_requesterId(requesterId),
m_canceled(false) {
// Create the request socket.
m_requestSocket = application::This::getCom().createRequestSocket(endpoint.withPort(responderPort).toString());
// Create a socket REP.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_repSocket.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream reqEndpoint;
reqEndpoint << "tcp://*:" << m_requesterPort;
m_repSocket->bind(reqEndpoint.str().c_str());
}
RequesterImpl::~RequesterImpl() {
terminate();
}
int RequesterImpl::newRequesterId() {
std::lock_guard<std::mutex> lock(m_mutex);
m_requesterCounter++;
return m_requesterCounter;
}
std::string RequesterImpl::getRequesterPortName(const std::string& name, int responderId, int requesterId) {
std::stringstream requesterPortName;
requesterPortName << REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;
return requesterPortName.str();
}
Waiting * RequesterImpl::waiting() {
return new Waiting(std::bind(&RequesterImpl::cancel, this));
}
void RequesterImpl::sendBinary(const std::string& requestData) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::REQUEST);
request.pushKey(message::Request::APPLICATION_NAME);
request.pushString(application::This::getName());
request.pushKey(message::Request::APPLICATION_ID);
request.pushInt(application::This::getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(application::This::getEndpoint().getProtocol() + "://" + application::This::getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(application::This::getEndpoint().getPort());
request.pushKey(message::Request::REQUESTER_PORT);
request.pushInt(m_requesterPort);
m_requestSocket->request(request.toString(), requestData);
}
void RequesterImpl::send(const std::string& requestData) {
// encode the data
std::string result;
serialize(requestData, result);
sendBinary(result);
}
void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::REQUEST);
request.pushKey(message::Request::APPLICATION_NAME);
request.pushString(application::This::getName());
request.pushKey(message::Request::APPLICATION_ID);
request.pushInt(application::This::getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(application::This::getEndpoint().getProtocol() + "://" + application::This::getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(application::This::getEndpoint().getPort());
request.pushKey(message::Request::REQUESTER_PORT);
request.pushInt(m_requesterPort);
m_requestSocket->request(request.toString(), requestData1, requestData2);
}
std::optional<std::string> RequesterImpl::receiveBinary() {
if (m_canceled) {
return {};
}
std::unique_ptr<zmq::message_t> message(new zmq::message_t);
m_repSocket->recv(message.get(), 0);
// Get the JSON request.
json::Object request;
json::parse(request, message.get());
int type = request[message::TYPE].GetInt();
if (type == message::CANCEL) {
m_canceled = true;
return {};
}
std::optional<std::string> result;
if (type == message::RESPONSE) {
// Get the second part for the message.
message.reset(new zmq::message_t);
m_repSocket->recv(message.get(), 0);
result = std::string(message->data<char>(), message->size());
}
// Create the reply.
std::string data = createRequestResponse(0, "OK");
size_t size = data.length();
std::unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy(reply->data(), data.c_str(), size);
m_repSocket->send(*reply);
return result;
}
std::optional<std::string> RequesterImpl::receive() {
return receiveBinary();
}
void RequesterImpl::cancel() {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket only for the request.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_requesterPort).toString());
requestSocket->requestJSON(request.toString());
}
void RequesterImpl::terminate() {
if (m_repSocket.get() != nullptr) {
m_repSocket.reset(nullptr);
application::This::getCom().removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
}
m_requestSocket.reset();
}
}
}
......
......@@ -19,52 +19,29 @@
#include "../../base/Waiting.h"
#include "Strings.h"
#include "zmq.hpp"
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <optional>
namespace cameo {
class RequestSocket;
namespace coms {
class RequesterImpl {
public:
RequesterImpl(const Endpoint& endpoint, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
~RequesterImpl();
static int newRequesterId();
static std::string getRequesterPortName(const std::string& name, int responderId, int requesterId);
Waiting * waiting();
void sendBinary(const std::string& requestData);
void send(const std::string& requestData);
void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2);
std::optional<std::string> receiveBinary();
std::optional<std::string> receive();
static const std::string REQUESTER_PREFIX;
void cancel();
void terminate();
virtual ~RequesterImpl() {}
int m_requesterPort;
std::string m_name;
int m_responderId;
int m_requesterId;
std::unique_ptr<RequestSocket> m_requestSocket;
std::unique_ptr<zmq::socket_t> m_repSocket;
bool m_canceled;
virtual void init(const Endpoint& endpoint, int requesterPort, int responderPort, const std::string& name) = 0;
virtual void sendBinary(const std::string& requestData) = 0;
virtual void send(const std::string& requestData) = 0;
virtual void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) = 0;
static const std::string REQUESTER_PREFIX;
virtual std::optional<std::string> receiveBinary() = 0;
virtual std::optional<std::string> receive() = 0;
static std::mutex m_mutex;
static int m_requesterCounter;
virtual void cancel() = 0;
virtual bool isCanceled() = 0;
virtual void terminate() = 0;
};
}
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "RequesterZmq.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../../base/impl/zmq/ContextZmq.h"
#include "../../../base/Messages.h"
#include "../../../base/RequestSocket.h"
#include <zmq.hpp>
#include <sstream>
namespace cameo {
namespace coms {
const std::string RequesterZmq::REQUESTER_PREFIX = "req.";
void RequesterZmq::init(const Endpoint& endpoint, int requesterPort, int responderPort, const std::string& name) {
m_requesterPort = requesterPort;
m_name = name;
m_canceled = false;
// Create the request socket.
m_requestSocket = application::This::getCom().createRequestSocket(endpoint.withPort(responderPort).toString());
// Create a socket REP.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_repSocket.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream reqEndpoint;
reqEndpoint << "tcp://*:" << m_requesterPort;
m_repSocket->bind(reqEndpoint.str().c_str());
}
RequesterZmq::~RequesterZmq() {
terminate();
}
void RequesterZmq::sendBinary(const std::string& requestData) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::REQUEST);
request.pushKey(message::Request::APPLICATION_NAME);
request.pushString(application::This::getName());
request.pushKey(message::Request::APPLICATION_ID);
request.pushInt(application::This::getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(application::This::getEndpoint().getProtocol() + "://" + application::This::getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(application::This::getEndpoint().getPort());
request.pushKey(message::Request::REQUESTER_PORT);
request.pushInt(m_requesterPort);
m_requestSocket->request(request.toString(), requestData);
}
void RequesterZmq::send(const std::string& requestData) {
// encode the data
std::string result;
serialize(requestData, result);
sendBinary(result);
}
void RequesterZmq::sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::REQUEST);
request.pushKey(message::Request::APPLICATION_NAME);
request.pushString(application::This::getName());
request.pushKey(message::Request::APPLICATION_ID);
request.pushInt(application::This::getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(application::This::getEndpoint().getProtocol() + "://" + application::This::getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(application::This::getEndpoint().getPort());
request.pushKey(message::Request::REQUESTER_PORT);
request.pushInt(m_requesterPort);
m_requestSocket->request(request.toString(), requestData1, requestData2);
}
std::optional<std::string> RequesterZmq::receiveBinary() {
if (m_canceled) {
return {};
}
std::unique_ptr<zmq::message_t> message(new zmq::message_t);
m_repSocket->recv(message.get(), 0);
// Get the JSON request.
json::Object request;
json::parse(request, message.get());
int type = request[message::TYPE].GetInt();
if (type == message::CANCEL) {
m_canceled = true;
return {};
}
std::optional<std::string> result;
if (type == message::RESPONSE) {
// Get the second part for the message.
message.reset(new zmq::message_t);
m_repSocket->recv(message.get(), 0);
result = std::string(message->data<char>(), message->size());
}
// Create the reply.
std::string data = createRequestResponse(0, "OK");
size_t size = data.length();
std::unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy(reply->data(), data.c_str(), size);
m_repSocket->send(*reply);
return result;
}
std::optional<std::string> RequesterZmq::receive() {
return receiveBinary();
}
void RequesterZmq::cancel() {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket only for the request.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(application::This::getEndpoint().withPort(m_requesterPort).toString());
requestSocket->requestJSON(request.toString());
}
bool RequesterZmq::isCanceled() {
return m_canceled;
}
void RequesterZmq::terminate() {
if (m_repSocket.get() != nullptr) {
m_repSocket.reset(nullptr);
}
m_requestSocket.reset();
}
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_REQUESTERZMQ_H_
#define CAMEO_REQUESTERZMQ_H_
#include "../RequesterImpl.h"
#include "../../../base/Waiting.h"
#include "Strings.h"
#include <string>
#include <optional>
#include <zmq.hpp>
namespace cameo {
class RequestSocket;
namespace coms {
class RequesterZmq : public RequesterImpl {