Commit e31a46e7 authored by legoc's avatar legoc
Browse files

Reviewed Request

parent 0e3e0ab9
......@@ -35,6 +35,7 @@ class Request {
friend std::ostream& operator<<(std::ostream&, const Request&);
public:
Request(const std::string & requesterApplicationName, int requesterApplicationId, const std::string& serverUrl, int serverPort, int requesterPort, const std::string& messagePart1, const std::string& messagePart2);
~Request();
std::string getObjectId() const;
......@@ -57,9 +58,14 @@ public:
std::unique_ptr<Server> getServer();
private:
Request(std::unique_ptr<RequestImpl> &impl);
std::string m_requesterEndpoint;
std::string m_messagePart1;
std::string m_messagePart2;
std::string m_requesterApplicationName;
int m_requesterApplicationId;
std::string m_requesterServerEndpoint;
int m_timeout;
std::unique_ptr<RequestImpl> m_impl;
std::unique_ptr<Server> m_requesterServer;
};
......
......@@ -22,7 +22,6 @@
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "impl/RequesterImpl.h"
#include "impl/RequestImpl.h"
#include "impl/ResponderImpl.h"
namespace cameo {
......@@ -31,68 +30,100 @@ namespace coms {
///////////////////////////////////////////////////////////////////////////
// Request
Request::Request(std::unique_ptr<RequestImpl> & impl) :
m_impl(std::move(impl)) {
Request::~Request() {
}
Request::~Request() {
void Request::setTimeout(int value) {
m_timeout = value;
}
std::string Request::getObjectId() const {
// Local id is missing.
return "request:"
+ m_impl->m_requesterApplicationName
+ m_requesterApplicationName
+ "."
+ std::to_string(m_impl->m_requesterApplicationId)
+ std::to_string(m_requesterApplicationId)
+ "@"
+ m_impl->m_requesterServerEndpoint;
+ m_requesterServerEndpoint;
}
std::string Request::getRequesterEndpoint() const {
return m_impl->m_requesterServerEndpoint;
}
void Request::setTimeout(int value) {
m_impl->setTimeout(value);
return m_requesterServerEndpoint;
}
const std::string& Request::getBinary() const {
return m_impl->m_message;
return m_messagePart1;
}
std::string Request::get() const {
std::string data;
parse(m_impl->m_message, data);
parse(m_messagePart1, data);
return data;
}
const std::string& Request::getSecondBinaryPart() const {
return m_impl->m_message2;
return m_messagePart2;
}
Request::Request(const std::string & requesterApplicationName, int requesterApplicationId, const std::string& serverUrl, int serverPort, int requesterPort, const std::string& messagePart1, const std::string& messagePart2) :
m_messagePart1(messagePart1),
m_messagePart2(messagePart2),
m_requesterApplicationName(requesterApplicationName),
m_requesterApplicationId(requesterApplicationId),
m_timeout(0) {
std::stringstream requesterEndpoint;
requesterEndpoint << serverUrl << ":" << requesterPort;
m_requesterEndpoint = requesterEndpoint.str();
std::stringstream requesterServerEndpoint;
requesterServerEndpoint << serverUrl << ":" << serverPort;
m_requesterServerEndpoint = requesterServerEndpoint.str();
}
bool Request::replyBinary(const std::string& response) {
return m_impl->replyBinary(response);
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::RESPONSE);
// Create a request socket. It is created for each request that could be optimized.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(m_requesterEndpoint);
try {
requestSocket->request(request.toString(), response);
}
catch (const ConnectionTimeout&) {
return false;
}
return true;
}
bool Request::reply(const std::string& response) {
return m_impl->reply(response);
// Encode the data.
std::string result;
serialize(response, result);
return replyBinary(result);
}
std::unique_ptr<application::Instance> Request::connectToRequester() {
// Instantiate the requester server if it does not exist.
if (m_requesterServer.get() == nullptr) {
m_requesterServer.reset(new Server(m_impl->m_requesterServerEndpoint, m_impl->m_timeout));
m_requesterServer.reset(new Server(m_requesterServerEndpoint, m_timeout));
}
// Connect and find the instance.
application::InstanceArray instances = m_requesterServer->connectAll(m_impl->m_requesterApplicationName);
application::InstanceArray instances = m_requesterServer->connectAll(m_requesterApplicationName);
for (int i = 0; i < instances.size(); i++) {
if (instances[i]->getId() == m_impl->m_requesterApplicationId) {
if (instances[i]->getId() == m_requesterApplicationId) {
return std::unique_ptr<application::Instance>(std::move(instances[i]));
}
}
......@@ -140,12 +171,7 @@ void Responder::cancel() {
}
std::unique_ptr<Request> Responder::receive() {
std::unique_ptr<RequestImpl> requestImpl = m_impl->receive();
if (requestImpl.get() == nullptr) {
return std::unique_ptr<Request>(nullptr);
}
return std::unique_ptr<Request>(new Request(requestImpl));
return m_impl->receive();
}
bool Responder::isCanceled() const {
......@@ -236,8 +262,8 @@ bool Requester::isCanceled() const {
std::ostream& operator<<(std::ostream& os, const Request& request) {
os << "[endpoint=" << request.m_impl->m_requesterEndpoint
<< ", id=" << request.m_impl->m_requesterApplicationId << "]";
os << "[endpoint=" << request.m_requesterEndpoint
<< ", id=" << request.m_requesterApplicationId << "]";
return os;
}
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "RequestImpl.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <sstream>
namespace cameo {
namespace coms {
RequestImpl::RequestImpl(const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort) :
m_message(message),
m_requesterApplicationName(requesterApplicationName),
m_requesterApplicationId(requesterApplicationId),
m_timeout(0) {
std::stringstream requesterEndpoint;
requesterEndpoint << serverUrl << ":" << requesterPort;
m_requesterEndpoint = requesterEndpoint.str();
std::stringstream requesterServerEndpoint;
requesterServerEndpoint << serverUrl << ":" << serverPort;
m_requesterServerEndpoint = requesterServerEndpoint.str();
}
RequestImpl::~RequestImpl() {
}
void RequestImpl::setTimeout(int value) {
m_timeout = value;
}
bool RequestImpl::replyBinary(const std::string& response) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::RESPONSE);
// Create a request socket. It is created for each request that could be optimized.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(m_requesterEndpoint);
try {
requestSocket->request(request.toString(), response);
}
catch (const ConnectionTimeout&) {
return false;
}
return true;
}
bool RequestImpl::reply(const std::string& response) {
// Encode the data.
std::string result;
serialize(response, result);
return replyBinary(result);
}
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_REQUESTIMPL_H_
#define CAMEO_REQUESTIMPL_H_
#include <string>
#include <vector>
#include "zmq.hpp"
namespace cameo {
namespace coms {
class RequestImpl {
public:
RequestImpl(const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort);
~RequestImpl();
void setTimeout(int value);
bool replyBinary(const std::string& response);
bool reply(const std::string& response);
std::string m_requesterEndpoint;
std::string m_message;
std::string m_message2;
std::string m_requesterApplicationName;
int m_requesterApplicationId;
std::string m_requesterServerEndpoint;
int m_timeout;
};
}
}
#endif
......@@ -16,7 +16,7 @@
#include "ResponderImpl.h"
#include "RequestImpl.h"
#include "RequesterResponder.h"
#include "Application.h"
#include "Serializer.h"
#include "JSON.h"
......@@ -63,7 +63,7 @@ Waiting * ResponderImpl::waiting() {
return new Waiting(std::bind(&ResponderImpl::cancel, this));
}
std::unique_ptr<RequestImpl> ResponderImpl::receive() {
std::unique_ptr<Request> ResponderImpl::receive() {
std::unique_ptr<zmq::message_t> message(new zmq::message_t);
m_responder->recv(message.get(), 0);
......@@ -76,7 +76,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
// Create the reply
std::unique_ptr<zmq::message_t> reply;
std::unique_ptr<RequestImpl> result;
std::unique_ptr<Request> result;
if (type == message::REQUEST) {
......@@ -90,21 +90,24 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
std::string message1(message->data<char>(), message->size());
// Create the request.
result = std::unique_ptr<RequestImpl>(new RequestImpl(name,
id,
message1,
serverUrl,
serverPort,
requesterPort));
std::string message2;
// Set message 2 if it exists.
if (message->more()) {
message.reset(new zmq::message_t);
m_responder->recv(message.get(), 0);
result->m_message2 = std::string(message->data<char>(), message->size());
message2 = std::string(message->data<char>(), message->size());
}
// Create the request.
result = std::unique_ptr<Request>(new Request(name,
id,
serverUrl,
serverPort,
requesterPort,
message1,
message2));
reply.reset(responseToRequest());
}
else if (type == message::CANCEL) {
......
......@@ -26,7 +26,7 @@
namespace cameo {
namespace coms {
class RequestImpl;
class Request;
class ResponderImpl {
......@@ -37,7 +37,7 @@ public:
void cancel();
Waiting * waiting();
std::unique_ptr<RequestImpl> receive();
std::unique_ptr<Request> receive();
zmq::message_t * responseToRequest();
zmq::message_t * responseToCancelResponder();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment