Commit 4ef77cd6 authored by legoc's avatar legoc
Browse files

(split) Enabled multiple=yes and stream=yes

parent 1717f6fb
......@@ -8,6 +8,7 @@
* Added storage functions: This::storeKeyValue(), This::getKeyValue(), This::removeKey(), Instance::getKeyValue().
* Added Instance::getPastStates().
* Added Instance.getExitCode().
* Filter OutputStreamSocket on application id.
0.3.3
-----
......
......@@ -57,6 +57,8 @@ class OutputStreamSocket {
public:
~OutputStreamSocket();
void setApplicationId(int id);
bool receive(Output& ouput);
void cancel();
bool isEnded() const;
......@@ -67,6 +69,7 @@ private:
WaitingImpl * waiting();
int m_applicationId;
bool m_ended;
bool m_canceled;
......
......@@ -466,6 +466,7 @@ void Instance::setErrorMessage(const std::string& message) {
void Instance::setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket) {
m_outputStreamSocket = std::move(socket);
m_outputStreamSocket->setApplicationId(m_id);
}
void Instance::setPastStates(State pastStates) {
......
......@@ -44,6 +44,7 @@ bool Output::isEndOfLine() const {
}
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
m_applicationId(-1),
m_ended(false),
m_canceled(false),
m_impl(impl) {
......@@ -52,38 +53,55 @@ OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
OutputStreamSocket::~OutputStreamSocket() {
}
void OutputStreamSocket::setApplicationId(int id) {
m_applicationId = id;
}
bool OutputStreamSocket::receive(Output& output) {
unique_ptr<zmq::message_t> message(m_impl->receive());
// Loop on receive() because in case of configuration multiple=yes, messages can come from different instances.
while (true) {
unique_ptr<zmq::message_t> message(m_impl->receive());
string messageType(message->data<char>(), message->size());
string response(message->data<char>(), message->size());
// Cancel can only come from this instance.
if (messageType == message::Event::CANCEL) {
m_canceled = true;
return false;
}
if (response == message::Event::STREAM) {
}
else if (response == message::Event::ENDSTREAM) {
m_ended = true;
return false;
}
else if (response == message::Event::CANCEL) {
m_canceled = true;
return false;
}
// Get the second part of the message.
message = m_impl->receive();
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
int id = event[message::ApplicationStream::ID].GetInt();
int id = event[message::ApplicationStream::ID].GetInt();
string line = event[message::ApplicationStream::MESSAGE].GetString();
bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
// Filter on the application id so that only the messages concerning the instance applicationId are processed.
// Others are ignored.
if (m_applicationId == -1 || m_applicationId == id) {
output.m_id = id;
output.m_message = line;
output.m_endOfLine = endOfLine;
// Terminate the stream if type of message is ENDSTREAM.
if (messageType == message::Event::ENDSTREAM) {
m_ended = true;
return false;
}
return true;
// Here the type of message is STREAM.
string line = event[message::ApplicationStream::MESSAGE].GetString();
bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
output.m_id = id;
output.m_message = line;
output.m_endOfLine = endOfLine;
return true;
}
// Here, the application id is different from id, then re-iterate.
}
}
void OutputStreamSocket::cancel() {
......
......@@ -149,11 +149,12 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
registerEventListener(instance.get());
try {
unique_ptr<OutputStreamSocket> streamSocket;
if (outputStream) {
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
streamSocket = createOutputStreamSocket(getStreamPort(name));
}
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
......@@ -168,6 +169,10 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
}
else {
instance->setId(value);
if (outputStream) {
instance->setOutputStreamSocket(streamSocket);
}
}
}
catch (const ConnectionTimeout& e) {
......@@ -237,17 +242,17 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
if (isAlive(applicationId)) {
aliveInstancesCount++;
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
if (outputStream) {
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
instance->setId(applicationId);
instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
if (outputStream) {
unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(streamSocket);
}
instances.m_array[i] = std::move(instance);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment