Commit 81663f14 authored by legoc's avatar legoc
Browse files

Enabled multiple=yes and stream=yes

parent b9d97b23
......@@ -8,6 +8,7 @@
* Added storage functions: This::storeKeyValue(), This::getKeyValue(), This::removeKey(), Instance::getKeyValue().
* Added Instance::getPastStates().
* Added Instance.getExitCode().
* Filter OutputStreamSocket on application id.
0.3.3
-----
......
......@@ -57,6 +57,8 @@ class OutputStreamSocket {
public:
~OutputStreamSocket();
void setApplicationId(int id);
bool receive(Output& ouput);
void cancel();
bool isEnded() const;
......@@ -67,6 +69,7 @@ private:
WaitingImpl * waiting();
int m_applicationId;
bool m_ended;
bool m_canceled;
......
......@@ -466,6 +466,7 @@ void Instance::setErrorMessage(const std::string& message) {
void Instance::setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket) {
m_outputStreamSocket = std::move(socket);
m_outputStreamSocket->setApplicationId(m_id);
}
void Instance::setPastStates(State pastStates) {
......
......@@ -44,6 +44,7 @@ bool Output::isEndOfLine() const {
}
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
m_applicationId(-1),
m_ended(false),
m_canceled(false),
m_impl(impl) {
......@@ -52,38 +53,55 @@ OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
OutputStreamSocket::~OutputStreamSocket() {
}
void OutputStreamSocket::setApplicationId(int id) {
m_applicationId = id;
}
bool OutputStreamSocket::receive(Output& output) {
unique_ptr<zmq::message_t> message(m_impl->receive());
// Loop on receive() because in case of configuration multiple=yes, messages can come from different instances.
while (true) {
unique_ptr<zmq::message_t> message(m_impl->receive());
string messageType(message->data<char>(), message->size());
string response(message->data<char>(), message->size());
// Cancel can only come from this instance.
if (messageType == message::Event::CANCEL) {
m_canceled = true;
return false;
}
if (response == message::Event::STREAM) {
}
else if (response == message::Event::ENDSTREAM) {
m_ended = true;
return false;
}
else if (response == message::Event::CANCEL) {
m_canceled = true;
return false;
}
// Get the second part of the message.
message = m_impl->receive();
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
int id = event[message::ApplicationStream::ID].GetInt();
int id = event[message::ApplicationStream::ID].GetInt();
string line = event[message::ApplicationStream::MESSAGE].GetString();
bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
// Filter on the application id so that only the messages concerning the instance applicationId are processed.
// Others are ignored.
if (m_applicationId == -1 || m_applicationId == id) {
output.m_id = id;
output.m_message = line;
output.m_endOfLine = endOfLine;
// Terminate the stream if type of message is ENDSTREAM.
if (messageType == message::Event::ENDSTREAM) {
m_ended = true;
return false;
}
return true;
// Here the type of message is STREAM.
string line = event[message::ApplicationStream::MESSAGE].GetString();
bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
output.m_id = id;
output.m_message = line;
output.m_endOfLine = endOfLine;
return true;
}
// Here, the application id is different from id, then re-iterate.
}
}
void OutputStreamSocket::cancel() {
......
......@@ -149,11 +149,12 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
registerEventListener(instance.get());
try {
unique_ptr<OutputStreamSocket> streamSocket;
if (outputStream) {
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
streamSocket = createOutputStreamSocket(getStreamPort(name));
}
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
......@@ -168,6 +169,10 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
}
else {
instance->setId(value);
if (outputStream) {
instance->setOutputStreamSocket(streamSocket);
}
}
}
catch (const ConnectionTimeout& e) {
......@@ -237,17 +242,17 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
if (isAlive(applicationId)) {
aliveInstancesCount++;
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
if (outputStream) {
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
instance->setId(applicationId);
instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
if (outputStream) {
unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(streamSocket);
}
instances.m_array[i] = std::move(instance);
}
}
......
......@@ -7,7 +7,8 @@
* Removed Configuration.getRetries().
* Added storage methods: This.storeKeyValue(), This.getKeyValue(), This.removeKey(), Instance.getKeyValue().
* Added Instance.getPastStates().
* Added Instance.getExitCode().
* Added Instance.getExitCode().
* Filter OutputStreamSocket on application id.
0.1.9
-----
......
......@@ -29,6 +29,7 @@ public class OutputStreamSocket {
private ServicesImpl services;
private Zmq.Socket socket;
private Zmq.Socket cancelSocket;
private int applicationId = -1;
private boolean ended = false;
private boolean canceled = false;
......@@ -39,38 +40,59 @@ public class OutputStreamSocket {
this.cancelSocket = cancelPublisher;
}
/**
* Sets the application id.
* @param id
*/
public void setApplicationId(int id) {
this.applicationId = id;
}
public Application.Output receive() {
String message = this.socket.recvStr();
if (message.equals(Message.Event.STREAM)) {
}
else if (message.equals(Message.Event.ENDSTREAM)) {
ended = true;
return null;
}
else if (message.equals(Message.Event.CANCEL)) {
canceled = true;
return null;
}
byte[] streamMessage = this.socket.recv();
try {
// Get the JSON object.
JSONObject stream = services.parse(streamMessage);
// Loop on recvStr() because in case of configuration multiple=yes, messages can come from different instances.
while (true) {
String messageType = this.socket.recvStr();
int id = JSON.getInt(stream, Message.ApplicationStream.ID);
String line = JSON.getString(stream, Message.ApplicationStream.MESSAGE);
boolean endOfLine = JSON.getBoolean(stream, Message.ApplicationStream.EOL);
// Cancel can only come from this instance.
if (messageType.equals(Message.Event.CANCEL)) {
canceled = true;
return null;
}
return new Application.Output(id, line, endOfLine);
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
// Get the second part of the message.
byte[] messageValue = this.socket.recv();
try {
// Get the JSON object.
JSONObject stream = services.parse(messageValue);
int id = JSON.getInt(stream, Message.ApplicationStream.ID);
// Filter on the application id so that only the messages concerning the instance applicationId are processed.
// Others are ignored.
if (applicationId == -1 || applicationId == id) {
// Terminate the stream if type of message is ENDSTREAM.
if (messageType.equals(Message.Event.ENDSTREAM)) {
ended = true;
return null;
}
// Here the type of message is STREAM.
String line = JSON.getString(stream, Message.ApplicationStream.MESSAGE);
boolean endOfLine = JSON.getBoolean(stream, Message.ApplicationStream.EOL);
return new Application.Output(id, line, endOfLine);
}
// Here, the application id is different from id, then re-iterate.
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response : " + messageValue);
}
}
}
public boolean isEnded() {
return ended;
......
......@@ -70,7 +70,8 @@ public class InstanceImpl extends EventListener {
}
void setOutputStreamSocket(OutputStreamSocket outputSocket) {
this.outputSocket = outputSocket;
this.outputSocket = outputSocket;
this.outputSocket.setApplicationId(id);
}
void setErrorMessage(String message) {
......
......@@ -203,10 +203,12 @@ public class ServerImpl extends ServicesImpl {
registerEventListener(instance);
try {
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
OutputStreamSocket streamSocket = null;
if (outputStream) {
instance.setOutputStreamSocket(createOutputStreamSocket(getStreamPort(name)));
streamSocket = createOutputStreamSocket(getStreamPort(name));
}
Response response = startApplication(name, args, instanceReference);
......@@ -216,6 +218,10 @@ public class ServerImpl extends ServicesImpl {
}
else {
instance.setId(response.getValue());
if (outputStream) {
instance.setOutputStreamSocket(streamSocket);
}
}
}
catch (ConnectionTimeout e) {
......@@ -320,16 +326,15 @@ public class ServerImpl extends ServicesImpl {
// Test if the application is still alive otherwise we could have missed a status message.
if (isAlive(applicationId)) {
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
if (outputStream) {
instance.setOutputStreamSocket(createOutputStreamSocket(getStreamPort(name)));
}
instance.setId(applicationId);
instance.setInitialState(JSON.getInt(applicationInfo, Message.ApplicationInfo.APPLICATION_STATE));
instance.setInitialState(JSON.getInt(applicationInfo, Message.ApplicationInfo.PAST_APPLICATION_STATES));
if (outputStream) {
instance.setOutputStreamSocket(createOutputStreamSocket(getStreamPort(name)));
}
instances.add(instance);
}
else {
......@@ -612,7 +617,11 @@ public class ServerImpl extends ServicesImpl {
throw new OutputStreamException(JSON.getString(response, Message.RequestResponse.MESSAGE));
}
return createOutputStreamSocket(port);
// Create the output stream socket.
OutputStreamSocket streamSocket = createOutputStreamSocket(port);
streamSocket.setApplicationId(id);
return streamSocket;
}
/**
......
......@@ -11,6 +11,7 @@
* Added the argument --log-console in Server main.
* Added storage requests.
* Send the exit code in the status events.
* Enabled multiple=yes and stream=yes.
0.1.8
-----
......
......@@ -186,12 +186,6 @@ public class ApplicationConfig {
this.stream = runSingle;
} else if (value.equalsIgnoreCase("yes")) {
this.stream = true;
if (!runSingle) {
stream = false;
Log.logger().warning("The application " + name + " cannot have multiple=yes and stream=yes");
}
} else if (value.equalsIgnoreCase("no")) {
this.stream = false;
} else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment