Commit 935a7d91 authored by legoc's avatar legoc
Browse files

Refactored EventStreamSocket

parent 4fca5c71
......@@ -17,7 +17,6 @@ package fr.ill.ics.cameo.base;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.impl.EventStreamSocketImpl;
import fr.ill.ics.cameo.base.impl.zmq.EventStreamSocketZmq;
......@@ -25,9 +24,13 @@ public class EventStreamSocket {
private EventStreamSocketImpl impl;
public EventStreamSocket(Server server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
public EventStreamSocket(Server server) {
//TODO Replace with factory.
this.impl = new EventStreamSocketZmq(server, subscriber, cancelPublisher);
this.impl = new EventStreamSocketZmq(server);
}
public void init() {
impl.init();
}
public Event receive() {
......
......@@ -184,7 +184,7 @@ public class Server {
return false;
}
private void sendSync() {
public void sendSync() {
try {
requestSocket.requestJSON(Messages.createSyncRequest());
......@@ -290,42 +290,12 @@ public class Server {
private EventStreamSocket openEventStream() {
JSONObject response = requestSocket.requestJSON(Messages.createStreamStatusRequest());
// Prepare our subscriber.
Zmq.Socket subscriber = context.createSocket(Zmq.SUB);
statusPort = JSON.getInt(response, Messages.RequestResponse.VALUE);
subscriber.connect(getStatusEndpoint().toString());
subscriber.subscribe(Messages.Event.STATUS);
subscriber.subscribe(Messages.Event.RESULT);
subscriber.subscribe(Messages.Event.PUBLISHER);
subscriber.subscribe(Messages.Event.PORT);
subscriber.subscribe(Messages.Event.KEYVALUE);
String cancelEndpoint = "inproc://cancel." + CancelIdGenerator.newId();
subscriber.connect(cancelEndpoint);
subscriber.subscribe(Messages.Event.CANCEL);
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
while (true) {
// the server returns a STATUS message that is used to synchronize the subscriber
sendSync();
// return at the first response.
if (poller.poll(100)) {
break;
}
}
Zmq.Socket cancelPublisher = context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
EventStreamSocket streamSocket = new EventStreamSocket(this);
streamSocket.init();
return new EventStreamSocket(this, subscriber, cancelPublisher);
return streamSocket;
}
/**
......
......@@ -4,6 +4,7 @@ import fr.ill.ics.cameo.base.Event;
public interface EventStreamSocketImpl {
void init();
Event receive();
boolean isCanceled();
void cancel();
......
......@@ -29,6 +29,7 @@ import fr.ill.ics.cameo.base.ResultEvent;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.base.StatusEvent;
import fr.ill.ics.cameo.base.UnexpectedException;
import fr.ill.ics.cameo.base.impl.CancelIdGenerator;
import fr.ill.ics.cameo.base.impl.EventStreamSocketImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -36,27 +37,65 @@ import fr.ill.ics.cameo.messages.Messages;
public class EventStreamSocketZmq implements EventStreamSocketImpl {
private Server server;
private Zmq.Socket socket;
private Zmq.Context context;
private Zmq.Socket subscriberSocket;
private Zmq.Socket cancelSocket;
private boolean canceled = false;
public EventStreamSocketZmq(Server server, Zmq.Socket subscriber, Zmq.Socket cancelPublisher) {
public EventStreamSocketZmq(Server server) {
super();
this.context = ((ContextZmq)server.getContext()).getContext();
this.server = server;
this.socket = subscriber;
}
public void init() {
// Prepare our subscriber.
Zmq.Socket subscriber = context.createSocket(Zmq.SUB);
subscriber.connect(server.getStatusEndpoint().toString());
subscriber.subscribe(Messages.Event.STATUS);
subscriber.subscribe(Messages.Event.RESULT);
subscriber.subscribe(Messages.Event.PUBLISHER);
subscriber.subscribe(Messages.Event.PORT);
subscriber.subscribe(Messages.Event.KEYVALUE);
String cancelEndpoint = "inproc://cancel." + CancelIdGenerator.newId();
subscriber.connect(cancelEndpoint);
subscriber.subscribe(Messages.Event.CANCEL);
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
while (true) {
// the server returns a STATUS message that is used to synchronize the subscriber
server.sendSync();
// return at the first response.
if (poller.poll(100)) {
break;
}
}
Zmq.Socket cancelPublisher = context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
this.subscriberSocket = subscriber;
this.cancelSocket = cancelPublisher;
}
public Event receive() {
String message = this.socket.recvStr();
String message = this.subscriberSocket.recvStr();
Event event = null;
// We can receive messages from the status publisher located in the server
// as well as messages from the cancel publisher located in the same process.
if (message.equals(Messages.Event.STATUS)) {
byte[] statusMessage = this.socket.recv();
byte[] statusMessage = this.subscriberSocket.recv();
try {
// Get the JSON object.
......@@ -81,7 +120,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
else if (message.equals(Messages.Event.RESULT)) {
byte[] resultMessage = this.socket.recv();
byte[] resultMessage = this.subscriberSocket.recv();
try {
// Get the JSON object.
......@@ -91,7 +130,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
String name = JSON.getString(jsonObject, Messages.ResultEvent.NAME);
// Get the next message to get the data.
byte[] data = this.socket.recv();
byte[] data = this.subscriberSocket.recv();
event = new ResultEvent(id, name, data);
}
......@@ -101,7 +140,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
else if (message.equals(Messages.Event.PUBLISHER)) {
byte[] publisherMessage = this.socket.recv();
byte[] publisherMessage = this.subscriberSocket.recv();
try {
// Get the JSON object.
......@@ -119,7 +158,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
else if (message.equals(Messages.Event.PORT)) {
byte[] portMessage = this.socket.recv();
byte[] portMessage = this.subscriberSocket.recv();
try {
// Get the JSON object.
......@@ -137,7 +176,7 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
else if (message.equals(Messages.Event.KEYVALUE)) {
byte[] keyValueMessage = this.socket.recv();
byte[] keyValueMessage = this.subscriberSocket.recv();
try {
// Get the JSON object.
......@@ -178,6 +217,6 @@ public class EventStreamSocketZmq implements EventStreamSocketImpl {
}
public void destroy() {
server.destroySocket(socket);
server.destroySocket(subscriberSocket);
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment