Commit ca02a5f1 authored by legoc's avatar legoc
Browse files

Finished replacement with request socket

parent d7aee1c2
......@@ -846,7 +846,7 @@ void Publisher::sendEnd() const {
///////////////////////////////////////////////////////////////////////////
// Subscriber
Subscriber::Subscriber(const Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string & publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
Subscriber::Subscriber(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string & publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_impl(new SubscriberImpl(server, url, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, instanceId, instanceEndpoint, statusEndpoint)) {
}
......
......@@ -372,7 +372,7 @@ public:
void cancel();
private:
Subscriber(const Server * server, const std::string& url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
Subscriber(Server * server, const std::string& url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
void init();
std::unique_ptr<SubscriberImpl> m_impl;
......
......@@ -340,7 +340,7 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
return Services::openEventStream();
}
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const {
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECTPUBLISHER), m_impl->createConnectPublisherRequest(id, publisherName));
......
......@@ -118,7 +118,7 @@ private:
std::unique_ptr<application::Instance> makeInstance();
bool isAlive(int id) const;
Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName);
int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
......
......@@ -161,57 +161,6 @@ std::string ServicesImpl::createStartRequest(const std::string& name, const std:
return strRequestStart;
}
std::unique_ptr<zmq::message_t> ServicesImpl::tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket.setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket.connect(endpoint.c_str());
}
catch (exception const & e) {
throw SocketException(e.what());
}
int requestTypeSize = strRequestType.length();
int requestDataSize = strRequestData.length();
zmq::message_t requestType(requestTypeSize);
zmq::message_t requestData(requestDataSize);
memcpy((void *) requestType.data(), strRequestType.c_str(), requestTypeSize);
memcpy((void *) requestData.data(), strRequestData.c_str(), requestDataSize);
socket.send(requestType, ZMQ_SNDMORE);
socket.send(requestData);
int timeout = m_timeout;
if (overrideTimeout > -1) {
timeout = overrideTimeout;
}
if (timeout > 0) {
// polling
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(socket);
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
int rc = zmq::poll(items, 1, timeout);
if (rc == 0) {
// timeout
socket.close();
throw ConnectionTimeout();
}
}
unique_ptr<zmq::message_t> reply(new zmq::message_t());
socket.recv(reply.get(), 0);
return reply;
}
std::string ServicesImpl::createStopRequest(int id) const {
proto::StopCommand stopCommand;
stopCommand.set_id(id);
......@@ -459,32 +408,6 @@ std::string ServicesImpl::createOutputRequest(const std::string& name) const {
return result;
}
bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout) {
try {
unique_ptr<zmq::message_t> reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.c_str(), timeout);
if (reply.get() != nullptr) {
return true;
}
} catch (const ConnectionTimeout&) {
// do nothing, timeout
}
return false;
}
void ServicesImpl::subscribeToPublisher(const std::string& endpoint) {
string strRequestType = createRequestType(PROTO_SUBSCRIBEPUBLISHER);
string strRequestData = createSubscribePublisherRequest();
unique_ptr<zmq::message_t> reply = tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}
bool ServicesImpl::isAvailable(RequestSocketImpl * socket, int timeout) {
string requestTypePart = createRequestType(PROTO_INIT);
......
......@@ -67,12 +67,8 @@ public:
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::socket_t * createRequestSocket(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
bool isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout);
void subscribeToPublisher(const std::string& endpoint);
bool isAvailable(RequestSocketImpl * socket, int timeout);
void waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket);
......
......@@ -21,6 +21,7 @@
#include "../Serializer.h"
#include "CancelIdGenerator.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include "../Server.h"
using namespace std;
......@@ -33,10 +34,10 @@ const std::string SubscriberImpl::ENDSTREAM = "ENDSTREAM";
const std::string SubscriberImpl::CANCEL = "CANCEL";
const std::string SubscriberImpl::STATUS = "STATUS";
SubscriberImpl::SubscriberImpl(const Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
SubscriberImpl::SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_server(server),
m_publisherName(publisherName),
m_url(url),
m_publisherName(publisherName),
m_publisherPort(publisherPort),
m_synchronizerPort(synchronizerPort),
m_numberOfSubscribers(numberOfSubscribers),
......@@ -53,7 +54,7 @@ SubscriberImpl::~SubscriberImpl() {
void SubscriberImpl::init() {
// create a socket for publishing
// Create a socket for publishing.
m_subscriber.reset(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_SUB));
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, SYNC.c_str(), SYNC.length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, STREAM.c_str(), STREAM.length());
......@@ -69,7 +70,7 @@ void SubscriberImpl::init() {
// We must first bind the cancel publisher before connecting the subscriber.
stringstream cancelEndpoint;
// We define a unique name
// We define a unique name.
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
m_cancelEndpoint = cancelEndpoint.str();
......@@ -79,7 +80,7 @@ void SubscriberImpl::init() {
m_subscriber->connect(m_cancelEndpoint.c_str());
m_subscriber->connect(m_statusEndpoint.c_str());
// synchronize the subscriber only if the number of subscribers > 0
// Synchronize the subscriber only if the number of subscribers > 0.
if (m_numberOfSubscribers > 0) {
stringstream syncEndpoint;
......@@ -87,29 +88,29 @@ void SubscriberImpl::init() {
string endpoint = syncEndpoint.str();
// polling subscriber
// Create a request socket.
unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(endpoint);
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*m_subscriber);
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
bool ready = false;
while (!ready) {
string strRequestType = m_server->m_impl->createRequestType(PROTO_INIT);
string strRequestData = m_server->m_impl->createInitRequest();
m_server->m_impl->isAvailable(strRequestType, strRequestData, endpoint, 100);
while (true) {
m_server->m_impl->isAvailable(requestSocket.get(), 100);
// wait for 100ms
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
ready = true;
break;
}
}
m_server->m_impl->subscribeToPublisher(endpoint);
unique_ptr<zmq::message_t> reply = requestSocket->request(m_server->m_impl->createRequestType(PROTO_SUBSCRIBEPUBLISHER), m_server->m_impl->createSubscribePublisherRequest());
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}
}
......
......@@ -29,7 +29,7 @@ class Server;
class SubscriberImpl {
public:
SubscriberImpl(const Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
~SubscriberImpl();
void init();
......@@ -47,7 +47,7 @@ public:
WaitingImpl * waiting();
const Server * m_server;
Server * m_server;
std::string m_url;
int m_publisherPort;
int m_synchronizerPort;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment