Commit ea2992a0 authored by legoc's avatar legoc
Browse files

Prepare RequestSocketImpl to minimize socket creation

parent 02325052
...@@ -26,6 +26,7 @@ libcameo_la_SOURCES = \ ...@@ -26,6 +26,7 @@ libcameo_la_SOURCES = \
cameo/ResponderCreationException.cpp \ cameo/ResponderCreationException.cpp \
cameo/RequesterCreationException.cpp \ cameo/RequesterCreationException.cpp \
cameo/impl/StreamSocketImpl.cpp \ cameo/impl/StreamSocketImpl.cpp \
cameo/impl/RequestSocketImpl.cpp \
cameo/impl/SocketWaitingImpl.cpp \ cameo/impl/SocketWaitingImpl.cpp \
cameo/impl/GenericWaitingImpl.cpp \ cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \ cameo/impl/WaitingImplSet.cpp \
...@@ -46,6 +47,7 @@ libcameo_la_SOURCES = \ ...@@ -46,6 +47,7 @@ libcameo_la_SOURCES = \
cameo/impl/HandlerImpl.cpp \ cameo/impl/HandlerImpl.cpp \
cameo/Application.cpp \ cameo/Application.cpp \
cameo/impl/StreamSocketImpl.h \ cameo/impl/StreamSocketImpl.h \
cameo/impl/RequestSocketImpl.h \
cameo/impl/WaitingImpl.h \ cameo/impl/WaitingImpl.h \
cameo/impl/SocketWaitingImpl.h \ cameo/impl/SocketWaitingImpl.h \
cameo/impl/GenericWaitingImpl.h \ cameo/impl/GenericWaitingImpl.h \
......
...@@ -157,4 +157,8 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) ...@@ -157,4 +157,8 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher))); return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
} }
std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::string& endpoint) {
return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), m_impl->m_timeout));
}
} }
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include "EventStreamSocket.h" #include "EventStreamSocket.h"
#include "OutputStreamSocket.h" #include "OutputStreamSocket.h"
#include "impl/RequestSocketImpl.h"
namespace cameo { namespace cameo {
...@@ -48,6 +49,7 @@ public: ...@@ -48,6 +49,7 @@ public:
void initStatus(); void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream(); std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port); std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::string m_serverEndpoint; std::string m_serverEndpoint;
std::string m_url; std::string m_url;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "RequestSocketImpl.h"
#include "../ConnectionTimeout.h"
#include <iostream>
using namespace std;
namespace cameo {
RequestSocketImpl::RequestSocketImpl(zmq::socket_t * socket, int timeout) :
m_socket(socket), m_timeout(timeout) {
}
RequestSocketImpl::~RequestSocketImpl() {
}
std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout) {
// Prepare the request parts.
int requestTypeSize = requestTypePart.length();
int requestDataSize = requestDataPart.length();
zmq::message_t requestType(requestTypeSize);
zmq::message_t requestData(requestDataSize);
memcpy(static_cast<void *>(requestType.data()), requestTypePart.c_str(), requestTypeSize);
memcpy(static_cast<void *>(requestData.data()), requestDataPart.c_str(), requestDataSize);
// Send the request in two parts.
m_socket->send(requestType, ZMQ_SNDMORE);
m_socket->send(requestData);
int timeout = m_timeout;
if (overrideTimeout > -1) {
timeout = overrideTimeout;
}
if (timeout > 0) {
// Polling.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(m_socket.get());
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
int rc = zmq::poll(items, 1, timeout);
if (rc == 0) {
// Timeout occurred.
m_socket->close();
throw ConnectionTimeout();
}
}
// Receive the response.
unique_ptr<zmq::message_t> reply(new zmq::message_t());
m_socket->recv(reply.get(), 0);
return reply;
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_REQUESTSOCKETIMPL_H_
#define CAMEO_REQUESTSOCKETIMPL_H_
#include <string>
#include <memory>
#include "zmq.hpp"
namespace cameo {
class RequestSocketImpl {
public:
RequestSocketImpl(zmq::socket_t * socket, int timeout = 0);
virtual ~RequestSocketImpl();
std::unique_ptr<zmq::message_t> request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout = -1);
std::unique_ptr<zmq::socket_t> m_socket;
int m_timeout;
};
}
#endif
...@@ -236,6 +236,25 @@ zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) ...@@ -236,6 +236,25 @@ zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint)
return publisher; return publisher;
} }
zmq::socket_t * ServicesImpl::createRequestSocket(const std::string& endpoint) {
zmq::socket_t* socket = new zmq::socket_t(m_context, ZMQ_REQ);
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket->setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket->connect(endpoint.c_str());
}
catch (exception const & e) {
throw SocketException(e.what());
}
return socket;
}
std::string ServicesImpl::createShowStreamRequest(int id) const { std::string ServicesImpl::createShowStreamRequest(int id) const {
proto::ShowStreamCommand showStreamCommand; proto::ShowStreamCommand showStreamCommand;
showStreamCommand.set_id(id); showStreamCommand.set_id(id);
......
...@@ -65,6 +65,7 @@ public: ...@@ -65,6 +65,7 @@ public:
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint); zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint); zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint); zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::socket_t * createRequestSocket(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1); std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const; std::string createShowStreamRequest(int id) const;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment