Commit 0b09034f authored by legoc's avatar legoc
Browse files

Implementation of C++ responder timeout

parent ec8c969d
......@@ -968,6 +968,10 @@ Request::Request(std::unique_ptr<RequestImpl> & impl) :
Request::~Request() {
}
void Request::setTimeout(int value) {
m_impl->setTimeout(value);
}
const std::string& Request::getBinary() const {
return m_impl->m_message;
}
......
......@@ -396,6 +396,8 @@ public:
std::string get() const;
const std::string& getSecondBinaryPart() const;
void setTimeout(int value);
void replyBinary(const std::string& response);
void reply(const std::string& response);
......
......@@ -171,4 +171,8 @@ std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::stri
return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), m_impl->m_timeout));
}
std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::string& endpoint, int timeout) {
return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), timeout));
}
}
......@@ -51,6 +51,7 @@ public:
std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout);
std::string m_serverEndpoint;
std::string m_url;
......
......@@ -30,7 +30,8 @@ RequestImpl::RequestImpl(application::This * application, const std::string & re
m_application(application),
m_message(message),
m_requesterApplicationName(requesterApplicationName),
m_requesterApplicationId(requesterApplicationId) {
m_requesterApplicationId(requesterApplicationId),
m_timeout(0) {
stringstream requesterEndpoint;
requesterEndpoint << serverUrl << ":" << requesterPort;
......@@ -44,11 +45,22 @@ RequestImpl::RequestImpl(application::This * application, const std::string & re
RequestImpl::~RequestImpl() {
}
void RequestImpl::setTimeout(int value) {
m_timeout = value;
}
void RequestImpl::replyBinary(const std::string& response) {
// Create a request socket. It is created for each request that could be optimized.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_requesterEndpoint);
requestSocket->request(m_application->m_impl->createRequestType(PROTO_RESPONSE), response);
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_requesterEndpoint, m_timeout);
//requestSocket->requestAsync(m_application->m_impl->createRequestType(PROTO_RESPONSE), response);
try {
requestSocket->request(m_application->m_impl->createRequestType(PROTO_RESPONSE), response);
}
catch (const ConnectionTimeout&) {
cout << "timeout while replying" << endl;
}
}
void RequestImpl::reply(const std::string& response) {
......
......@@ -33,6 +33,8 @@ public:
RequestImpl(application::This * application, const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort);
~RequestImpl();
void setTimeout(int value);
void replyBinary(const std::string& response);
void reply(const std::string& response);
......@@ -43,6 +45,7 @@ public:
std::string m_requesterApplicationName;
int m_requesterApplicationId;
std::string m_requesterServerEndpoint;
int m_timeout;
};
}
......
......@@ -18,6 +18,9 @@
#include "../ConnectionTimeout.h"
#include <iostream>
#include <chrono>
#include <thread>
using namespace std;
namespace cameo {
......@@ -59,7 +62,6 @@ std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& re
int rc = zmq::poll(items, 1, timeout);
if (rc == 0) {
// Timeout occurred.
m_socket->close();
throw ConnectionTimeout();
}
}
......@@ -71,4 +73,24 @@ std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& re
return reply;
}
void RequestSocketImpl::requestAsync(const std::string& requestTypePart, const std::string& requestDataPart) {
// Prepare the request parts.
int requestTypeSize = requestTypePart.length();
int requestDataSize = requestDataPart.length();
zmq::message_t requestType(requestTypeSize);
zmq::message_t requestData(requestDataSize);
memcpy(static_cast<void *>(requestType.data()), requestTypePart.c_str(), requestTypeSize);
memcpy(static_cast<void *>(requestData.data()), requestDataPart.c_str(), requestDataSize);
// Send the request in two parts.
m_socket->send(requestType, ZMQ_SNDMORE);
m_socket->send(requestData);
// ...
// Close the socket as we do not need to wait for the reply.
m_socket->close();
}
}
......@@ -30,6 +30,7 @@ public:
virtual ~RequestSocketImpl();
std::unique_ptr<zmq::message_t> request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout = -1);
void requestAsync(const std::string& requestTypePart, const std::string& requestDataPart);
std::unique_ptr<zmq::socket_t> m_socket;
int m_timeout;
......
......@@ -257,8 +257,8 @@ zmq::socket_t * ServicesImpl::createRequestSocket(const std::string& endpoint) {
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket->setsockopt(ZMQ_LINGER, &value, sizeof(int));
// int value = 0;
// socket->setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket->connect(endpoint.c_str());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment