Commit 1b041459 authored by legoc's avatar legoc
Browse files

Services uses request socket

parent eee78b83
......@@ -126,7 +126,7 @@ void Services::initStatus() {
std::unique_ptr<EventStreamSocket> Services::openEventStream() {
// init the status if needed
// Init the status port if necessary.
if (m_statusPort == 0) {
initStatus();
}
......@@ -136,15 +136,14 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// create sockets
// Create the sockets.
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint.str());
zmq::socket_t * subscriber = m_impl->createEventSubscriber(m_serverStatusEndpoint, cancelEndpoint.str());
// wait for the connection
string strRequestType = m_impl->createRequestType(PROTO_INIT);
string strRequestData = m_impl->createInitRequest();
m_impl->waitForSubscriber(subscriber, strRequestType, strRequestData, m_serverEndpoint);
// Wait for the connection to be ready.
m_impl->waitForSubscriber(subscriber, m_requestSocket.get());
// Create the event stream socket.
return unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
......@@ -154,15 +153,17 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
return nullptr;
}
// Prepare our context and subscriber
// Prepare our context and subscriber.
string streamEndpoint = m_url + ":" + to_string(port);
// We define a unique name that depends on the event stream socket object because there can be many (instances).
string cancelEndpoint = "inproc://cancel." + to_string(CancelIdGenerator::newId());
// Create the sockets.
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);
// Create the output stream socket.
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
......
......@@ -475,27 +475,6 @@ bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::str
return false;
}
void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint) {
// polling subscriber
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*subscriber);
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
bool ready = false;
while (!ready) {
isAvailable(strRequestType, strRequestData, endpoint, 100);
// wait for 100ms ?
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
ready = true;
}
}
}
void ServicesImpl::subscribeToPublisher(const std::string& endpoint) {
string strRequestType = createRequestType(PROTO_SUBSCRIBEPUBLISHER);
......@@ -525,4 +504,24 @@ bool ServicesImpl::isAvailable(RequestSocketImpl * socket, int timeout) {
return false;
}
void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket) {
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*subscriber);
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
while (true) {
isAvailable(socket, 100);
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
break;
}
}
}
}
......@@ -71,10 +71,10 @@ public:
std::string createShowStreamRequest(int id) const;
bool isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout);
void waitForSubscriber(zmq::socket_t * subscriber, const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint);
void subscribeToPublisher(const std::string& endpoint);
bool isAvailable(RequestSocketImpl * socket, int timeout);
void waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket);
zmq::context_t m_context;
int m_timeout;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment