Commit 07954c04 authored by legoc's avatar legoc
Browse files

Prepare RequestSocketImpl to minimize socket creation

parent 58411ed2
......@@ -26,6 +26,7 @@ libcameo_la_SOURCES = \
cameo/ResponderCreationException.cpp \
cameo/RequesterCreationException.cpp \
cameo/impl/StreamSocketImpl.cpp \
cameo/impl/RequestSocketImpl.cpp \
cameo/impl/SocketWaitingImpl.cpp \
cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \
......@@ -46,6 +47,7 @@ libcameo_la_SOURCES = \
cameo/impl/HandlerImpl.cpp \
cameo/Application.cpp \
cameo/impl/StreamSocketImpl.h \
cameo/impl/RequestSocketImpl.h \
cameo/impl/WaitingImpl.h \
cameo/impl/SocketWaitingImpl.h \
cameo/impl/GenericWaitingImpl.h \
......
......@@ -157,4 +157,8 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::string& endpoint) {
return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), m_impl->m_timeout));
}
}
......@@ -21,6 +21,7 @@
#include <vector>
#include "EventStreamSocket.h"
#include "OutputStreamSocket.h"
#include "impl/RequestSocketImpl.h"
namespace cameo {
......@@ -48,6 +49,7 @@ public:
void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::string m_serverEndpoint;
std::string m_url;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "RequestSocketImpl.h"
#include "../ConnectionTimeout.h"
#include <iostream>
using namespace std;
namespace cameo {
RequestSocketImpl::RequestSocketImpl(zmq::socket_t * socket, int timeout) :
m_socket(socket), m_timeout(timeout) {
}
RequestSocketImpl::~RequestSocketImpl() {
}
std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout) {
// Prepare the request parts.
int requestTypeSize = requestTypePart.length();
int requestDataSize = requestDataPart.length();
zmq::message_t requestType(requestTypeSize);
zmq::message_t requestData(requestDataSize);
memcpy(static_cast<void *>(requestType.data()), requestTypePart.c_str(), requestTypeSize);
memcpy(static_cast<void *>(requestData.data()), requestDataPart.c_str(), requestDataSize);
// Send the request in two parts.
m_socket->send(requestType, ZMQ_SNDMORE);
m_socket->send(requestData);
int timeout = m_timeout;
if (overrideTimeout > -1) {
timeout = overrideTimeout;
}
if (timeout > 0) {
// Polling.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(m_socket.get());
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
int rc = zmq::poll(items, 1, timeout);
if (rc == 0) {
// Timeout occurred.
m_socket->close();
throw ConnectionTimeout();
}
}
// Receive the response.
unique_ptr<zmq::message_t> reply(new zmq::message_t());
m_socket->recv(reply.get(), 0);
return reply;
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_REQUESTSOCKETIMPL_H_
#define CAMEO_REQUESTSOCKETIMPL_H_
#include <string>
#include <memory>
#include "zmq.hpp"
namespace cameo {
class RequestSocketImpl {
public:
RequestSocketImpl(zmq::socket_t * socket, int timeout = 0);
virtual ~RequestSocketImpl();
std::unique_ptr<zmq::message_t> request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout = -1);
std::unique_ptr<zmq::socket_t> m_socket;
int m_timeout;
};
}
#endif
......@@ -236,6 +236,25 @@ zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint)
return publisher;
}
zmq::socket_t * ServicesImpl::createRequestSocket(const std::string& endpoint) {
zmq::socket_t* socket = new zmq::socket_t(m_context, ZMQ_REQ);
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket->setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket->connect(endpoint.c_str());
}
catch (exception const & e) {
throw SocketException(e.what());
}
return socket;
}
std::string ServicesImpl::createShowStreamRequest(int id) const {
proto::ShowStreamCommand showStreamCommand;
showStreamCommand.set_id(id);
......
......@@ -65,6 +65,7 @@ public:
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::socket_t * createRequestSocket(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment