Commit 2cc56d70 authored by legoc's avatar legoc
Browse files

(split) Implemented sync stream

parent d21184dc
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
* Added Instance.getExitCode(). * Added Instance.getExitCode().
* Filter OutputStreamSocket on application id. * Filter OutputStreamSocket on application id.
* Added Server::connect() with id. * Added Server::connect() with id.
* Output stream is synced.
0.3.3 0.3.3
----- -----
......
...@@ -133,7 +133,6 @@ private: ...@@ -133,7 +133,6 @@ private:
Response stopApplicationAsynchronously(int id, bool immediately) const; Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName); std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName);
int getAvailableTimeout() const; int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
void storeKeyValue(int id, const std::string& key, const std::string& value); void storeKeyValue(int id, const std::string& key, const std::string& value);
std::string getKeyValue(int id, const std::string& key); std::string getKeyValue(int id, const std::string& key);
void removeKey(int id, const std::string& key); void removeKey(int id, const std::string& key);
......
...@@ -51,7 +51,8 @@ public: ...@@ -51,7 +51,8 @@ public:
void retrieveServerVersion(); void retrieveServerVersion();
void initStatus(); void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream(); std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port); int getStreamPort(const std::string& name);
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint); std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout); std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout);
......
...@@ -73,6 +73,11 @@ bool OutputStreamSocket::receive(Output& output) { ...@@ -73,6 +73,11 @@ bool OutputStreamSocket::receive(Output& output) {
// Get the second part of the message. // Get the second part of the message.
message = m_impl->receive(); message = m_impl->receive();
// Continue if type of message is SYNCSTREAM. Theses messages are only used for the poller.
if (messageType == message::Event::SYNCSTREAM) {
continue;
}
// Get the JSON event. // Get the JSON event.
json::Object event; json::Object event;
json::parse(event, message.get()); json::parse(event, message.get());
......
...@@ -127,17 +127,6 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op ...@@ -127,17 +127,6 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op
return start(name, vector<string>(), options); return start(name, vector<string>(), options);
} }
int Server::getStreamPort(const std::string& name) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createOutputPortRequest(name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return response[message::RequestResponse::VALUE].GetInt();
}
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) { std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
...@@ -152,9 +141,8 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co ...@@ -152,9 +141,8 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
unique_ptr<OutputStreamSocket> streamSocket; unique_ptr<OutputStreamSocket> streamSocket;
if (outputStream) { if (outputStream) {
// We connect to the stream port before starting the application. // Connect to the stream port. A sync is made to ensure that the subscriber is connected.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly. streamSocket = createOutputStreamSocket(name);
streamSocket = createOutputStreamSocket(getStreamPort(name));
} }
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference())); unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
...@@ -242,14 +230,12 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op ...@@ -242,14 +230,12 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
if (isAlive(applicationId)) { if (isAlive(applicationId)) {
aliveInstancesCount++; aliveInstancesCount++;
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
instance->setId(applicationId); instance->setId(applicationId);
instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt()); instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt()); instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
if (outputStream) { if (outputStream) {
unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(getStreamPort(name)); unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
instance->setOutputStreamSocket(streamSocket); instance->setOutputStreamSocket(streamSocket);
} }
...@@ -314,14 +300,12 @@ std::unique_ptr<application::Instance> Server::connect(int id, Option options) { ...@@ -314,14 +300,12 @@ std::unique_ptr<application::Instance> Server::connect(int id, Option options) {
// test if the application is still alive otherwise we could have missed a status message // test if the application is still alive otherwise we could have missed a status message
if (isAlive(applicationId)) { if (isAlive(applicationId)) {
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
instance->setId(applicationId); instance->setId(applicationId);
instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt()); instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt()); instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
if (outputStream) { if (outputStream) {
unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(getStreamPort(name)); unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
instance->setOutputStreamSocket(streamSocket); instance->setOutputStreamSocket(streamSocket);
} }
......
...@@ -176,7 +176,20 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() { ...@@ -176,7 +176,20 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
return unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher))); return unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
} }
std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) { int Services::getStreamPort(const std::string& name) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createOutputPortRequest(name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return response[message::RequestResponse::VALUE].GetInt();
}
std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(const std::string& name) {
int port = getStreamPort(name);
if (port == -1) { if (port == -1) {
return nullptr; return nullptr;
...@@ -192,6 +205,9 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) ...@@ -192,6 +205,9 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint); zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint); zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);
// Wait for the connection to be ready.
m_impl->waitForStreamSubscriber(subscriber, m_requestSocket.get(), name);
// Create the output stream socket. // Create the output stream socket.
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher))); return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
} }
......
...@@ -62,6 +62,18 @@ std::string ServicesImpl::createSyncRequest() const { ...@@ -62,6 +62,18 @@ std::string ServicesImpl::createSyncRequest() const {
return request.toString(); return request.toString();
} }
std::string ServicesImpl::createSyncStreamRequest(const std::string& name) const {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::SYNC_STREAM);
request.pushKey(message::SyncStreamRequest::NAME);
request.pushString(name);
return request.toString();
}
std::string ServicesImpl::createVersionRequest() const { std::string ServicesImpl::createVersionRequest() const {
json::StringObject request; json::StringObject request;
...@@ -480,6 +492,7 @@ zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& en ...@@ -480,6 +492,7 @@ zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& en
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB); zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList; vector<string> streamList;
streamList.push_back(message::Event::SYNCSTREAM);
streamList.push_back(message::Event::STREAM); streamList.push_back(message::Event::STREAM);
streamList.push_back(message::Event::ENDSTREAM); streamList.push_back(message::Event::ENDSTREAM);
streamList.push_back(message::Event::CANCEL); streamList.push_back(message::Event::CANCEL);
...@@ -542,6 +555,41 @@ bool ServicesImpl::isAvailable(RequestSocketImpl * socket, int timeout) { ...@@ -542,6 +555,41 @@ bool ServicesImpl::isAvailable(RequestSocketImpl * socket, int timeout) {
return false; return false;
} }
void ServicesImpl::sendSyncStream(RequestSocketImpl * socket, const std::string& name) {
string request = createSyncStreamRequest(name);
try {
unique_ptr<zmq::message_t> reply = socket->request(request);
}
catch (const ConnectionTimeout&) {
// The server is not accessible.
}
catch (...) {
// Should not happen.
}
}
void ServicesImpl::waitForStreamSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket, const std::string& name) {
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*subscriber);
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
while (true) {
sendSyncStream(socket, name);
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
break;
}
}
}
void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket) { void ServicesImpl::waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket) {
// Poll subscriber. // Poll subscriber.
......
...@@ -35,6 +35,7 @@ public: ...@@ -35,6 +35,7 @@ public:
int getTimeout() const; int getTimeout() const;
std::string createSyncRequest() const; std::string createSyncRequest() const;
std::string createSyncStreamRequest(const std::string& name) const;
std::string createVersionRequest() const; std::string createVersionRequest() const;
std::string createStartRequest(const std::string& name, const std::vector<std::string> & args, const std::string& instanceReference) const; std::string createStartRequest(const std::string& name, const std::vector<std::string> & args, const std::string& instanceReference) const;
std::string createStopRequest(int id) const; std::string createStopRequest(int id) const;
...@@ -71,6 +72,8 @@ public: ...@@ -71,6 +72,8 @@ public:
zmq::socket_t * createRequestSocket(const std::string& endpoint); zmq::socket_t * createRequestSocket(const std::string& endpoint);
bool isAvailable(RequestSocketImpl * socket, int timeout); bool isAvailable(RequestSocketImpl * socket, int timeout);
void sendSyncStream(RequestSocketImpl * socket, const std::string& name);
void waitForStreamSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket, const std::string& name);
void waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket); void waitForSubscriber(zmq::socket_t * subscriber, RequestSocketImpl * socket);
zmq::context_t m_context; zmq::context_t m_context;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment