Commit d5eac3fb authored by legoc's avatar legoc
Browse files

Reviewed Java stream socket

parent 872e8c39
......@@ -530,7 +530,7 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
// Create the event stream socket.
std::unique_ptr<EventStreamSocket> eventStreamSocket = std::unique_ptr<EventStreamSocket>(new EventStreamSocket());
eventStreamSocket->init(m_contextImpl.get(), getStatusEndpoint(), m_requestSocket.get());
eventStreamSocket->init(m_contextImpl.get(), m_serverEndpoint.withPort(m_statusPort), m_requestSocket.get());
return eventStreamSocket;
}
......
......@@ -19,18 +19,20 @@ package fr.ill.ics.cameo.base;
import fr.ill.ics.cameo.base.impl.EventStreamSocketImpl;
import fr.ill.ics.cameo.base.impl.zmq.EventStreamSocketZmq;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.strings.Endpoint;
public class EventStreamSocket {
private EventStreamSocketImpl impl;
public EventStreamSocket(Server server) {
public EventStreamSocket() {
//TODO Replace with factory.
this.impl = new EventStreamSocketZmq(server);
this.impl = new EventStreamSocketZmq();
}
public void init() {
impl.init();
public void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser) {
impl.init(context, endpoint, requestSocket, parser);
}
public Event receive() {
......
......@@ -19,17 +19,19 @@ package fr.ill.ics.cameo.base;
import fr.ill.ics.cameo.base.impl.OutputStreamSocketImpl;
import fr.ill.ics.cameo.base.impl.zmq.OutputStreamSocketZmq;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.strings.Endpoint;
public class OutputStreamSocket {
private OutputStreamSocketImpl impl;
public OutputStreamSocket(Server server, String name) {
impl = new OutputStreamSocketZmq(server, name);
public OutputStreamSocket(String name) {
impl = new OutputStreamSocketZmq(name);
}
public void init() {
impl.init();
public void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser) {
impl.init(context, endpoint, requestSocket, parser);
}
/**
......
......@@ -280,8 +280,8 @@ public class Server {
JSONObject response = requestSocket.requestJSON(Messages.createStreamStatusRequest());
statusPort = JSON.getInt(response, Messages.RequestResponse.VALUE);
EventStreamSocket eventStreamSocket = new EventStreamSocket(this);
eventStreamSocket.init();
EventStreamSocket eventStreamSocket = new EventStreamSocket();
eventStreamSocket.init(contextImpl, serverEndpoint.withPort(statusPort), requestSocket, parser);
return eventStreamSocket;
}
......@@ -669,8 +669,15 @@ public class Server {
private OutputStreamSocket createOutputStreamSocket(String name) {
OutputStreamSocket outputStreamSocket = new OutputStreamSocket(this, name);
outputStreamSocket.init();
OutputStreamSocket outputStreamSocket = new OutputStreamSocket(name);
int port = getStreamPort(name);
if (port == -1) {
return null;
}
outputStreamSocket.init(contextImpl, serverEndpoint.withPort(port), requestSocket, parser);
return outputStreamSocket;
}
......
package fr.ill.ics.cameo.base.impl;
import fr.ill.ics.cameo.base.Context;
import fr.ill.ics.cameo.base.Event;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.strings.Endpoint;
public interface EventStreamSocketImpl {
void init();
void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser);
Event receive();
boolean isCanceled();
void cancel();
......
package fr.ill.ics.cameo.base.impl;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.Context;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.strings.Endpoint;
public interface OutputStreamSocketImpl {
void init();
void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser);
void setApplicationId(int id);
Application.Output receive();
boolean isEnded();
......
......@@ -22,38 +22,43 @@ import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.CancelIdGenerator;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Context;
import fr.ill.ics.cameo.base.Event;
import fr.ill.ics.cameo.base.KeyEvent;
import fr.ill.ics.cameo.base.PortEvent;
import fr.ill.ics.cameo.base.PublisherEvent;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.ResultEvent;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.base.StatusEvent;
import fr.ill.ics.cameo.base.UnexpectedException;
import fr.ill.ics.cameo.base.impl.EventStreamSocketImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class EventStreamSocketZmq implements EventStreamSocketImpl {
private Server server;
private Zmq.Context context;
private Parser parser;
private Zmq.Socket subscriberSocket;
private Zmq.Socket cancelSocket;
private boolean canceled = false;
public EventStreamSocketZmq(Server server) {
public EventStreamSocketZmq() {
super();
this.server = server;
this.context = ((ContextZmq)server.getContext()).getContext();
}
public void init() {
public void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser) {
this.context = ((ContextZmq)context).getContext();
this.parser = parser;
// Prepare our subscriber.
Zmq.Socket subscriber = context.createSocket(Zmq.SUB);
Zmq.Socket subscriber = this.context.createSocket(Zmq.SUB);
subscriber.connect(server.getStatusEndpoint().toString());
subscriber.connect(endpoint.toString());
subscriber.subscribe(Messages.Event.STATUS);
subscriber.subscribe(Messages.Event.RESULT);
subscriber.subscribe(Messages.Event.PUBLISHER);
......@@ -66,12 +71,17 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
subscriber.subscribe(Messages.Event.CANCEL);
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
Zmq.Poller poller = this.context.createPoller(subscriber);
while (true) {
// the server returns a STATUS message that is used to synchronize the subscriber
server.sendSync();
try {
requestSocket.requestJSON(Messages.createSyncRequest());
} catch (ConnectionTimeout e) {
// do nothing
}
// return at the first response.
if (poller.poll(100)) {
......@@ -79,7 +89,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
}
Zmq.Socket cancelPublisher = context.createSocket(Zmq.PUB);
Zmq.Socket cancelPublisher = this.context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
this.subscriberSocket = subscriber;
......@@ -99,7 +109,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
try {
// Get the JSON object.
JSONObject jsonObject = server.parse(statusMessage);
JSONObject jsonObject = parser.parse(Messages.parseString(statusMessage));
int id = JSON.getInt(jsonObject, Messages.StatusEvent.ID);
String name = JSON.getString(jsonObject, Messages.StatusEvent.NAME);
......@@ -124,7 +134,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
try {
// Get the JSON object.
JSONObject jsonObject = server.parse(resultMessage);
JSONObject jsonObject = parser.parse(Messages.parseString(resultMessage));
int id = JSON.getInt(jsonObject, Messages.ResultEvent.ID);
String name = JSON.getString(jsonObject, Messages.ResultEvent.NAME);
......@@ -144,7 +154,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
try {
// Get the JSON object.
JSONObject jsonObject = server.parse(publisherMessage);
JSONObject jsonObject = parser.parse(Messages.parseString(publisherMessage));
int id = JSON.getInt(jsonObject, Messages.PublisherEvent.ID);
String name = JSON.getString(jsonObject, Messages.PublisherEvent.NAME);
......@@ -162,7 +172,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
try {
// Get the JSON object.
JSONObject jsonObject = server.parse(portMessage);
JSONObject jsonObject = parser.parse(Messages.parseString(portMessage));
int id = JSON.getInt(jsonObject, Messages.PortEvent.ID);
String name = JSON.getString(jsonObject, Messages.PortEvent.NAME);
......@@ -180,7 +190,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
try {
// Get the JSON object.
JSONObject jsonObject = server.parse(keyValueMessage);
JSONObject jsonObject = parser.parse(Messages.parseString(keyValueMessage));
int id = JSON.getInt(jsonObject, Messages.KeyEvent.ID);
String name = JSON.getString(jsonObject, Messages.KeyEvent.NAME);
......
......@@ -23,42 +23,42 @@ import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.CancelIdGenerator;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Context;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.base.UnexpectedException;
import fr.ill.ics.cameo.base.impl.OutputStreamSocketImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.strings.Endpoint;
public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
private Server server;
private String name;
private Zmq.Context context;
private Parser parser;
private Zmq.Socket subscriberSocket;
private Zmq.Socket cancelSocket;
private int applicationId = -1;
private boolean ended = false;
private boolean canceled = false;
public OutputStreamSocketZmq(Server server, String name) {
public OutputStreamSocketZmq(String name) {
super();
this.server = server;
this.name = name;
this.context = ((ContextZmq)server.getContext()).getContext();
}
public void init() {
public void init(Context context, Endpoint endpoint, RequestSocket requestSocket, Parser parser) {
int port = server.getStreamPort(name);
if (port == -1) {
return;
}
this.context = ((ContextZmq)context).getContext();
this.parser = parser;
// Prepare our context and subscriber
Zmq.Socket subscriber = context.createSocket(Zmq.SUB);
Zmq.Socket subscriber = this.context.createSocket(Zmq.SUB);
subscriber.connect(server.getEndpoint().withPort(port).toString());
subscriber.connect(endpoint.toString());
subscriber.subscribe(Messages.Event.SYNCSTREAM);
subscriber.subscribe(Messages.Event.STREAM);
subscriber.subscribe(Messages.Event.ENDSTREAM);
......@@ -68,16 +68,21 @@ public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
subscriber.connect(cancelEndpoint);
subscriber.subscribe(Messages.Event.CANCEL);
Zmq.Socket cancelPublisher = context.createSocket(Zmq.PUB);
Zmq.Socket cancelPublisher = this.context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
// Polling to wait for connection.
Zmq.Poller poller = context.createPoller(subscriber);
Zmq.Poller poller = this.context.createPoller(subscriber);
while (true) {
// the server returns a SYNCSTREAM message that is used to synchronize the subscriber
server.sendSyncStream(name);
try {
requestSocket.requestJSON(Messages.createSyncStreamRequest(name));
}
catch (ConnectionTimeout e) {
// do nothing
}
// return at the first response.
if (poller.poll(100)) {
......@@ -119,7 +124,7 @@ public class OutputStreamSocketZmq implements OutputStreamSocketImpl {
try {
// Get the JSON object.
JSONObject stream = server.parse(messageValue);
JSONObject stream = parser.parse(Messages.parseString(messageValue));
int id = JSON.getInt(stream, Messages.ApplicationStream.ID);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment