Commit 2d794da2 authored by legoc's avatar legoc
Browse files

Refactored Subscriber

parent 5e669696
......@@ -14,12 +14,15 @@ import fr.ill.ics.cameo.messages.Messages;
*/
public class Publisher {
private String name;
private PublisherImpl impl;
private PublisherWaiting waiting = new PublisherWaiting(this);
private Publisher() {
private Publisher(String name, int numberOfSubscribers) {
this.name = name;
//TODO Replace with factory.
this.impl = new PublisherZmq();
this.impl = new PublisherZmq(name, numberOfSubscribers);
waiting.add();
}
......@@ -35,7 +38,7 @@ public class Publisher {
}
int synchronizerPort = JSON.getInt(response, Messages.PublisherResponse.SYNCHRONIZER_PORT);
impl.init(publisherPort, synchronizerPort, name, numberOfSubscribers);
impl.init(publisherPort, synchronizerPort);
}
/**
......@@ -46,7 +49,7 @@ public class Publisher {
*/
static public Publisher create(String name, int numberOfSubscribers) throws PublisherCreationException {
Publisher publisher = new Publisher();
Publisher publisher = new Publisher(name, numberOfSubscribers);
publisher.init(name, numberOfSubscribers);
return publisher;
......@@ -63,7 +66,7 @@ public class Publisher {
}
public String getName() {
return impl.getName();
return name;
}
/**
......
......@@ -5,6 +5,7 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.coms.impl.SubscriberImpl;
import fr.ill.ics.cameo.coms.impl.zmq.SubscriberZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
......@@ -16,12 +17,16 @@ import fr.ill.ics.cameo.strings.Endpoint;
public class Subscriber {
private SubscriberImpl impl;
private SubscriberWaiting waiting = new SubscriberWaiting(this);
private Subscriber(SubscriberImpl impl) {
this.impl = impl;
private Subscriber() {
//TODO Replace with factory.
this.impl = new SubscriberZmq();
waiting.add();
}
private static SubscriberImpl createSubscriber(String publisherName, Instance instance) throws SubscriberCreationException {
private void initSubscriber(Instance instance, String publisherName) throws SubscriberCreationException {
JSONObject request = Messages.createConnectPublisherRequest(instance.getId(), publisherName);
JSONObject response = instance.getCom().requestJSON(request);
......@@ -35,19 +40,16 @@ public class Subscriber {
int synchronizerPort = JSON.getInt(response, Messages.PublisherResponse.SYNCHRONIZER_PORT);
int numberOfSubscribers = JSON.getInt(response, Messages.PublisherResponse.NUMBER_OF_SUBSCRIBERS);
SubscriberImpl subscriber = new SubscriberImpl(publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instance);
subscriber.init();
return subscriber;
impl.init(instance, publisherPort, synchronizerPort, publisherName, numberOfSubscribers);
}
static SubscriberImpl createSubscriber(Instance application, String publisherName) {
private boolean init(Instance application, String publisherName) {
try {
SubscriberImpl subscriber = createSubscriber(publisherName, application);
return subscriber;
} catch (SubscriberCreationException e) {
initSubscriber(application, publisherName);
return true;
}
catch (SubscriberCreationException e) {
// the publisher does not exist, so we are waiting for it
}
......@@ -59,19 +61,19 @@ public class Subscriber {
|| lastState == Application.State.STOPPED
|| lastState == Application.State.KILLED
|| lastState == Application.State.ERROR) {
return null;
return false;
}
try {
SubscriberImpl subscriber = createSubscriber(publisherName, application);
return subscriber;
} catch (SubscriberCreationException e) {
initSubscriber(application, publisherName);
return true;
}
catch (SubscriberCreationException e) {
// that should not happen
System.err.println("the publisher " + publisherName + " does not exist but should");
}
return null;
return false;
}
/**
......@@ -80,7 +82,11 @@ public class Subscriber {
* @return
*/
public static Subscriber create(Instance application, String publisherName) {
return new Subscriber(createSubscriber(application, publisherName));
Subscriber subscriber = new Subscriber();
subscriber.init(application, publisherName);
return subscriber;
}
public String getPublisherName() {
......@@ -136,11 +142,13 @@ public class Subscriber {
}
public void terminate() {
waiting.remove();
impl.terminate();
}
@Override
public String toString() {
return impl.toString();
return "sub." + getPublisherName() + ":" + getInstanceName() + "." + getInstanceId() + "@" + getInstanceEndpoint();
}
}
\ No newline at end of file
......@@ -14,15 +14,15 @@
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl;
package fr.ill.ics.cameo.coms;
import fr.ill.ics.cameo.base.Waiting;
public class SubscriberWaitingImpl extends Waiting {
public class SubscriberWaiting extends Waiting {
private SubscriberImpl subscriber;
private Subscriber subscriber;
public SubscriberWaitingImpl(SubscriberImpl subscriber) {
public SubscriberWaiting(Subscriber subscriber) {
this.subscriber = subscriber;
}
......
......@@ -18,8 +18,7 @@ package fr.ill.ics.cameo.coms.impl;
public interface PublisherImpl {
void init(int publisherPort, int synchronizerPort, String name, int numberOfSubscribers);
String getName();
void init(int publisherPort, int synchronizerPort);
boolean waitForSubscribers();
void cancelWaitForSubscribers();
void send(byte[] data);
......
......@@ -16,259 +16,22 @@
package fr.ill.ics.cameo.coms.impl;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.CancelIdGenerator;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class SubscriberImpl {
private int publisherPort;
private int synchronizerPort;
private Zmq.Context context;
private Zmq.Socket subscriber;
private String cancelEndpoint;
private Zmq.Socket cancelPublisher;
private String publisherName;
private int numberOfSubscribers;
private Instance instance;
private boolean ended = false;
private boolean canceled = false;
private SubscriberWaitingImpl waiting = new SubscriberWaitingImpl(this);
public SubscriberImpl(int publisherPort, int synchronizerPort, String publisherName, int numberOfSubscribers, Instance instance) {
this.publisherPort = publisherPort;
this.synchronizerPort = synchronizerPort;
this.publisherName = publisherName;
this.numberOfSubscribers = numberOfSubscribers;
this.instance = instance;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
waiting.add();
}
public void init() throws ConnectionTimeout {
// Create the subscriber
subscriber = context.createSocket(Zmq.SUB);
subscriber.connect(instance.getEndpoint().withPort(publisherPort).toString());
subscriber.subscribe(Messages.Event.SYNC);
subscriber.subscribe(Messages.Event.STREAM);
subscriber.subscribe(Messages.Event.ENDSTREAM);
// Create an endpoint that should be unique
cancelEndpoint = "inproc://cancel." + CancelIdGenerator.newId();
// Create a cancel publisher so that it sends the CANCEL message to the status subscriber (connected to 2 publishers)
cancelPublisher = context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
// Subscribe to CANCEL
subscriber.connect(cancelEndpoint);
subscriber.subscribe(Messages.Event.CANCEL);
// Subscribe to STATUS
subscriber.connect(instance.getStatusEndpoint().toString());
subscriber.subscribe(Messages.Event.STATUS);
// Synchronize the subscriber only if the number of subscribers > 0
if (numberOfSubscribers > 0) {
// Create a socket that will be used for several requests.
RequestSocket requestSocket = This.getCom().createRequestSocket(instance.getEndpoint().withPort(synchronizerPort).toString());
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
boolean ready = false;
while (!ready) {
// The subscriber sends init messages to the publisher that returns SYNC message
try {
requestSocket.requestJSON(Messages.createSyncRequest());
} catch (ConnectionTimeout e) {
// do nothing
}
// return at the first response.
if (poller.poll(100)) {
ready = true;
}
}
// The subscriber is connected and ready to receive data.
// Notify the publisher that it can send data.
JSONObject request = Messages.createSubscribePublisherRequest();
requestSocket.requestJSON(request);
requestSocket.terminate();
}
}
public String getPublisherName() {
return publisherName;
}
public String getInstanceName() {
return instance.getName();
}
public int getInstanceId() {
return instance.getId();
}
public Endpoint getInstanceEndpoint() {
return instance.getEndpoint();
}
public boolean isEnded() {
return ended;
}
public boolean isCanceled() {
return canceled;
}
/**
*
* @return the byte[] data. If the return value is null, then the stream is finished.
*/
public byte[] receive() {
while (true) {
String message = subscriber.recvStr();
if (message.equals(Messages.Event.STREAM)) {
return subscriber.recv();
} else if (message.equals(Messages.Event.ENDSTREAM)) {
ended = true;
return null;
} else if (message.equals(Messages.Event.CANCEL)) {
canceled = true;
return null;
} else if (message.equals(Messages.Event.STATUS)) {
byte[] statusMessage = subscriber.recv();
// Get the JSON object.
JSONObject status = This.getCom().parse(statusMessage);
// Get the id.
int id = JSON.getInt(status, Messages.StatusEvent.ID);
if (instance.getId() == id) {
// Get the state.
int state = JSON.getInt(status, Messages.StatusEvent.APPLICATION_STATE);
// Test if the state is terminal
if (state == Application.State.SUCCESS
|| state == Application.State.STOPPED
|| state == Application.State.KILLED
|| state == Application.State.ERROR) {
// Exit because the remote application has terminated.
return null;
}
}
}
}
}
/**
*
* @return the byte[] data. If the return value is null, then the stream is finished.
*/
public byte[][] receiveTwoParts() {
while (true) {
String message = subscriber.recvStr();
if (message.equals(Messages.Event.STREAM)) {
byte[][] result = new byte[2][];
result[0] = subscriber.recv();
result[1] = subscriber.recv();
return result;
} else if (message.equals(Messages.Event.ENDSTREAM)) {
ended = true;
return null;
} else if (message.equals(Messages.Event.CANCEL)) {
canceled = true;
return null;
} else if (message.equals(Messages.Event.STATUS)) {
byte[] statusMessage = subscriber.recv();
// Get the JSON request object.
JSONObject request = This.getCom().parse(statusMessage);
// Get the id.
int id = JSON.getInt(request, Messages.StatusEvent.ID);
if (instance.getId() == id) {
// Get the state.
int state = JSON.getInt(request, Messages.StatusEvent.APPLICATION_STATE);
// Test if the state is terminal
if (state == Application.State.SUCCESS
|| state == Application.State.STOPPED
|| state == Application.State.KILLED
|| state == Application.State.ERROR) {
// Exit because the remote application has terminated.
return null;
}
}
}
}
}
/**
*
* @return the string data. If the return value is null, then the stream is finished.
*/
public String receiveString() {
byte[] data = receive();
if (data == null) {
return null;
}
return Messages.parseString(data);
}
public void cancel() {
cancelPublisher.sendMore(Messages.Event.CANCEL);
cancelPublisher.send(Messages.Event.CANCEL);
}
public void terminate() {
waiting.remove();
context.destroySocket(subscriber);
context.destroySocket(cancelPublisher);
}
@Override
public String toString() {
return "sub." + publisherName + ":" + instance.getName() + "." + instance.getId() + "@" + instance.getEndpoint();
}
public interface SubscriberImpl {
void init(Instance instance, int publisherPort, int synchronizerPort, String publisherName, int numberOfSubscribers) throws ConnectionTimeout;
String getPublisherName();
String getInstanceName();
int getInstanceId();
Endpoint getInstanceEndpoint();
boolean isEnded();
boolean isCanceled();
byte[] receive();
byte[][] receiveTwoParts();
String receiveString();
void cancel();
void terminate();
}
\ No newline at end of file
......@@ -36,21 +36,20 @@ public class PublisherZmq implements PublisherImpl {
private Zmq.Socket publisher = null;
private boolean ended = false;
public void init(int publisherPort, int synchronizerPort, String name, int numberOfSubscribers) {
this.synchronizerPort = synchronizerPort;
public PublisherZmq(String name, int numberOfSubscribers) {
this.name = name;
this.numberOfSubscribers = numberOfSubscribers;
}
public void init(int publisherPort, int synchronizerPort) {
this.synchronizerPort = synchronizerPort;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// create a socket for publishing
publisher = context.createSocket(Zmq.PUB);
publisher.bind("tcp://*:" + publisherPort);
}
public String getName() {
return name;
}
public boolean waitForSubscribers() {
if (numberOfSubscribers <= 0) {
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl.zmq;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.CancelIdGenerator;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.coms.impl.SubscriberImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class SubscriberZmq implements SubscriberImpl {
private Zmq.Context context;
private Zmq.Socket subscriber;
private String cancelEndpoint;
private Zmq.Socket cancelPublisher;
private String publisherName;
private Instance instance;
private boolean ended = false;
private boolean canceled = false;
public void init(Instance instance, int publisherPort, int synchronizerPort, String publisherName, int numberOfSubscribers) {
this.instance = instance;
this.publisherName = publisherName;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// Create the subscriber
subscriber = context.createSocket(Zmq.SUB);
subscriber.connect(instance.getEndpoint().withPort(publisherPort).toString());
subscriber.subscribe(Messages.Event.SYNC);
subscriber.subscribe(Messages.Event.STREAM);
subscriber.subscribe(Messages.Event.ENDSTREAM);
// Create an endpoint that should be unique
cancelEndpoint = "inproc://cancel." + CancelIdGenerator.newId();
// Create a cancel publisher so that it sends the CANCEL message to the status subscriber (connected to 2 publishers)
cancelPublisher = context.createSocket(Zmq.PUB);
cancelPublisher.bind(cancelEndpoint);
// Subscribe to CANCEL
subscriber.connect(cancelEndpoint);
subscriber.subscribe(Messages.Event.CANCEL);
// Subscribe to STATUS
subscriber.connect(instance.getStatusEndpoint().toString());
subscriber.subscribe(Messages.Event.STATUS);
// Synchronize the subscriber only if the number of subscribers > 0
if (numberOfSubscribers > 0) {
// Create a socket that will be used for several requests.
RequestSocket requestSocket = This.getCom().createRequestSocket(instance.getEndpoint().withPort(synchronizerPort).toString());
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
boolean ready = false;
while (!ready) {
// The subscriber sends init messages to the publisher that returns SYNC message
try {
requestSocket.requestJSON(Messages.createSyncRequest());
} catch (ConnectionTimeout e) {
// do nothing
}
// return at the first response.
if (poller.poll(100)) {
ready = true;
}
}
// The subscriber is connected and ready to receive data.