Commit 3867f682 authored by legoc's avatar legoc
Browse files

Added endOfLine member in application stream events

parent 99159b6c
......@@ -28,7 +28,7 @@ using namespace std;
namespace cameo {
Output::Output() :
m_id(0) {
m_id(0), m_endOfLine(false) {
}
int Output::getId() const {
......@@ -39,6 +39,10 @@ const std::string& Output::getMessage() const {
return m_message;
}
bool Output::isEndOfLine() const {
return m_endOfLine;
}
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
m_ended(false),
m_canceled(false),
......@@ -73,9 +77,11 @@ bool OutputStreamSocket::receive(Output& output) {
int id = event[message::ApplicationStream::ID].GetInt();
string line = event[message::ApplicationStream::MESSAGE].GetString();
bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
output.m_id = id;
output.m_message = line;
output.m_endOfLine = endOfLine;
return true;
}
......
......@@ -40,10 +40,12 @@ public:
int getId() const;
const std::string& getMessage() const;
bool isEndOfLine() const;
private:
int m_id;
std::string m_message;
bool m_endOfLine;
};
......
......@@ -140,6 +140,7 @@ namespace message {
namespace ApplicationStream {
constexpr const char* ID = "id"; // required int32 id = 1;
constexpr const char* MESSAGE = "message"; // required string message = 2;
constexpr const char* EOL = "eol"; // boolean
}
namespace SendParametersRequest {
......
......@@ -522,11 +522,13 @@ public class Application {
private int id;
private String message;
private boolean endOfLine;
public Output(int id, String message) {
public Output(int id, String message, boolean endOfLine) {
super();
this.id = id;
this.message = message;
this.endOfLine = endOfLine;
}
public int getId() {
......@@ -536,10 +538,14 @@ public class Application {
public String getMessage() {
return message;
}
public boolean isEndOfLine() {
return endOfLine;
}
@Override
public String toString() {
return "ApplicationStream [id=" + id + ", message=" + message + "]";
return "ApplicationStream [id=" + id + ", message=" + message + " eol=" + endOfLine + "]";
}
}
......
......@@ -37,7 +37,12 @@ public class OutputPrintThread extends Thread {
return;
}
System.out.println(stream.getMessage());
if (stream.isEndOfLine()) {
System.out.println(stream.getMessage());
}
else {
System.out.print(stream.getMessage());
}
}
} catch (Exception e) {
......
......@@ -67,8 +67,9 @@ public class OutputStreamSocket {
int id = JSON.getInt(stream, Message.ApplicationStream.ID);
String line = JSON.getString(stream, Message.ApplicationStream.MESSAGE);
boolean endOfLine = JSON.getBoolean(stream, Message.ApplicationStream.EOL);
return new Application.Output(id, line);
return new Application.Output(id, line, endOfLine);
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
......
......@@ -128,6 +128,7 @@ public class Message {
public static class ApplicationStream {
public static final String ID = "id"; // required int32 id = 1;
public static final String MESSAGE = "message"; // required string message = 2;
public static final String EOL = "eol"; // boolean
}
public static class SendParametersRequest {
......
......@@ -57,7 +57,7 @@ public class StreamApplicationThread extends Thread {
publisher = manager.getStreamPublisher(application.getName());
}
private void sendLine(String line) {
private void sendMessage(String line, boolean endOfLine) {
// prepare our context and publisher
if (application.isWriteStream()) {
......@@ -80,6 +80,7 @@ public class StreamApplicationThread extends Thread {
JSONObject event = new JSONObject();
event.put(Message.ApplicationStream.ID, application.getId());
event.put(Message.ApplicationStream.MESSAGE, line);
event.put(Message.ApplicationStream.EOL, endOfLine);
publisher.sendMore("STREAM");
publisher.send(Message.serialize(event), 0);
......@@ -139,6 +140,7 @@ public class StreamApplicationThread extends Thread {
// The process is now accessible and cannot be null.
InputStreamReader is = new InputStreamReader(application.getProcess().getInputStream());
reader = new BufferedReader(is);
if (application.isWriteStream()) {
createFile(application.getLogPath());
if (fileOutputStream == null) {
......@@ -157,11 +159,10 @@ public class StreamApplicationThread extends Thread {
if (reader.ready()) {
readCharacters();
if (send) {
// TODO use eol
sendLine(characters.toString());
sendMessage(characters.toString(), eol);
}
} else {
}
else {
try {
Thread.sleep(ConfigManager.getInstance().getPollingTime());
} catch (InterruptedException e) {
......@@ -173,8 +174,7 @@ public class StreamApplicationThread extends Thread {
while (reader.ready()) {
readCharacters();
if (send) {
// TODO use eol
sendLine(characters.toString());
sendMessage(characters.toString(), eol);
}
}
......@@ -217,6 +217,7 @@ public class StreamApplicationThread extends Thread {
JSONObject event = new JSONObject();
event.put(Message.ApplicationStream.ID, application.getId());
event.put(Message.ApplicationStream.MESSAGE, "endstream");
event.put(Message.ApplicationStream.EOL, true);
publisher.sendMore("ENDSTREAM");
publisher.send(Message.serialize(event), 0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment