Commit 5e669696 authored by legoc's avatar legoc
Browse files

Refactoring Publisher

parent 646b68bd
......@@ -4,6 +4,7 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.coms.impl.PublisherImpl;
import fr.ill.ics.cameo.coms.impl.zmq.PublisherZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -14,12 +15,16 @@ import fr.ill.ics.cameo.messages.Messages;
public class Publisher {
private PublisherImpl impl;
private PublisherWaiting waiting = new PublisherWaiting(this);
private Publisher(PublisherImpl impl) {
this.impl = impl;
private Publisher() {
//TODO Replace with factory.
this.impl = new PublisherZmq();
waiting.add();
}
static PublisherImpl createPublisher(String name, int numberOfSubscribers) throws PublisherCreationException {
private void init(String name, int numberOfSubscribers) throws PublisherCreationException {
JSONObject request = Messages.createCreatePublisherRequest(This.getId(), name, numberOfSubscribers);
JSONObject response = This.getCom().requestJSON(request);
......@@ -30,7 +35,7 @@ public class Publisher {
}
int synchronizerPort = JSON.getInt(response, Messages.PublisherResponse.SYNCHRONIZER_PORT);
return new PublisherImpl(publisherPort, synchronizerPort, name, numberOfSubscribers);
impl.init(publisherPort, synchronizerPort, name, numberOfSubscribers);
}
/**
......@@ -40,7 +45,11 @@ public class Publisher {
* @throws PublisherCreationException, ConnectionTimeout
*/
static public Publisher create(String name, int numberOfSubscribers) throws PublisherCreationException {
return new Publisher(createPublisher(name, numberOfSubscribers));
Publisher publisher = new Publisher();
publisher.init(name, numberOfSubscribers);
return publisher;
}
/**
......@@ -50,7 +59,7 @@ public class Publisher {
* @throws PublisherCreationException, ConnectionTimeout
*/
static public Publisher create(String name) throws PublisherCreationException {
return new Publisher(createPublisher(name, 0));
return create(name, 0);
}
public String getName() {
......@@ -93,11 +102,12 @@ public class Publisher {
}
public void terminate() {
waiting.remove();
impl.terminate();
}
@Override
public String toString() {
return impl.toString();
return "pub." + getName() + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
}
......@@ -14,15 +14,15 @@
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl;
package fr.ill.ics.cameo.coms;
import fr.ill.ics.cameo.base.Waiting;
public class PublisherWaitingImpl extends Waiting {
public class PublisherWaiting extends Waiting {
private PublisherImpl publisher;
private Publisher publisher;
public PublisherWaitingImpl(PublisherImpl publisher) {
public PublisherWaiting(Publisher publisher) {
this.publisher = publisher;
}
......
......@@ -16,229 +16,16 @@
package fr.ill.ics.cameo.coms.impl;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class PublisherImpl {
private int synchronizerPort;
private String name;
private int numberOfSubscribers;
private Zmq.Context context;
private Zmq.Socket publisher = null;
private boolean ended = false;
private PublisherWaitingImpl waiting = new PublisherWaitingImpl(this);
public PublisherImpl(int publisherPort, int synchronizerPort, String name, int numberOfSubscribers) {
this.synchronizerPort = synchronizerPort;
this.name = name;
this.numberOfSubscribers = numberOfSubscribers;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// create a socket for publishing
publisher = context.createSocket(Zmq.PUB);
publisher.bind("tcp://*:" + publisherPort);
waiting.add();
}
public String getName() {
return name;
}
public boolean waitForSubscribers() {
if (numberOfSubscribers <= 0) {
return true;
}
Zmq.Socket synchronizer = null;
boolean canceled = false;
try {
// create a socket to receive the messages from the subscribers
synchronizer = context.createSocket(Zmq.REP);
String endpoint = "tcp://*:" + synchronizerPort;
synchronizer.bind(endpoint);
// loop until the number of subscribers is reached
int counter = 0;
while (counter < numberOfSubscribers) {
Zmq.Msg message = null;
Zmq.Msg reply = null;
try {
message = Zmq.Msg.recvMsg(synchronizer);
if (message == null) {
break;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.SYNC) {
reply = responseToSyncRequest();
}
else if (type == Messages.SUBSCRIBE_PUBLISHER_v0) {
counter++;
reply = responseToSubscribeRequest();
}
else if (type == Messages.CANCEL) {
canceled = true;
counter = numberOfSubscribers;
reply = responseToCancelRequest();
}
else {
reply = responseToUnknownRequest();
}
// send to the client
if (reply != null) {
reply.send(synchronizer);
}
}
finally {
if (message != null) {
message.destroy();
}
if (reply != null) {
reply.destroy();
}
}
}
} finally {
// destroy synchronizer socket as we do not need it anymore.
if (synchronizer != null) {
context.destroySocket(synchronizer);
}
}
return !canceled;
}
public void cancelWaitForSubscribers() {
Endpoint endpoint = This.getEndpoint().withPort(synchronizerPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
JSONObject response = requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
public void send(byte[] data) {
publisher.sendMore(Messages.Event.STREAM);
publisher.send(data, 0);
}
public void send(String data) {
byte[] result = Messages.serialize(data);
publisher.sendMore(Messages.Event.STREAM);
publisher.send(result, 0);
}
public void sendTwoParts(byte[] data1, byte[] data2) {
publisher.sendMore(Messages.Event.STREAM);
publisher.sendMore(data1);
publisher.send(data2, 0);
}
public void sendEnd() {
if (!ended) {
publisher.sendMore(Messages.Event.ENDSTREAM);
publisher.send(Messages.Event.ENDSTREAM);
ended = true;
}
}
public boolean isEnded() {
return ended;
}
public void terminate() {
waiting.remove();
sendEnd();
context.destroySocket(publisher);
JSONObject request = Messages.createTerminatePublisherRequest(This.getId(), name);
JSONObject response = This.getCom().requestJSON(request);
int value = JSON.getInt(response, Messages.RequestResponse.VALUE);
if (value == -1) {
System.err.println("Cannot terminate publisher");
}
}
private Zmq.Msg responseToSyncRequest() {
// send a dummy SYNC message by the publisher socket
publisher.sendMore(Messages.Event.SYNC);
publisher.send(Messages.Event.SYNC);
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToSubscribeRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToCancelRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToUnknownRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(-1, "Unknown request")));
return message;
}
@Override
public String toString() {
return "pub." + name + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
public interface PublisherImpl {
void init(int publisherPort, int synchronizerPort, String name, int numberOfSubscribers);
String getName();
boolean waitForSubscribers();
void cancelWaitForSubscribers();
void send(byte[] data);
void send(String data);
void sendTwoParts(byte[] data1, byte[] data2);
void sendEnd();
boolean isEnded();
void terminate();
}
\ No newline at end of file
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl.zmq;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.coms.impl.PublisherImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class PublisherZmq implements PublisherImpl {
private int synchronizerPort;
private String name;
private int numberOfSubscribers;
private Zmq.Context context;
private Zmq.Socket publisher = null;
private boolean ended = false;
public void init(int publisherPort, int synchronizerPort, String name, int numberOfSubscribers) {
this.synchronizerPort = synchronizerPort;
this.name = name;
this.numberOfSubscribers = numberOfSubscribers;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// create a socket for publishing
publisher = context.createSocket(Zmq.PUB);
publisher.bind("tcp://*:" + publisherPort);
}
public String getName() {
return name;
}
public boolean waitForSubscribers() {
if (numberOfSubscribers <= 0) {
return true;
}
Zmq.Socket synchronizer = null;
boolean canceled = false;
try {
// create a socket to receive the messages from the subscribers
synchronizer = context.createSocket(Zmq.REP);
String endpoint = "tcp://*:" + synchronizerPort;
synchronizer.bind(endpoint);
// loop until the number of subscribers is reached
int counter = 0;
while (counter < numberOfSubscribers) {
Zmq.Msg message = null;
Zmq.Msg reply = null;
try {
message = Zmq.Msg.recvMsg(synchronizer);
if (message == null) {
break;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.SYNC) {
reply = responseToSyncRequest();
}
else if (type == Messages.SUBSCRIBE_PUBLISHER_v0) {
counter++;
reply = responseToSubscribeRequest();
}
else if (type == Messages.CANCEL) {
canceled = true;
counter = numberOfSubscribers;
reply = responseToCancelRequest();
}
else {
reply = responseToUnknownRequest();
}
// send to the client
if (reply != null) {
reply.send(synchronizer);
}
}
finally {
if (message != null) {
message.destroy();
}
if (reply != null) {
reply.destroy();
}
}
}
} finally {
// destroy synchronizer socket as we do not need it anymore.
if (synchronizer != null) {
context.destroySocket(synchronizer);
}
}
return !canceled;
}
public void cancelWaitForSubscribers() {
Endpoint endpoint = This.getEndpoint().withPort(synchronizerPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
JSONObject response = requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
public void send(byte[] data) {
publisher.sendMore(Messages.Event.STREAM);
publisher.send(data, 0);
}
public void send(String data) {
byte[] result = Messages.serialize(data);
publisher.sendMore(Messages.Event.STREAM);
publisher.send(result, 0);
}
public void sendTwoParts(byte[] data1, byte[] data2) {
publisher.sendMore(Messages.Event.STREAM);
publisher.sendMore(data1);
publisher.send(data2, 0);
}
public void sendEnd() {
if (!ended) {
publisher.sendMore(Messages.Event.ENDSTREAM);
publisher.send(Messages.Event.ENDSTREAM);
ended = true;
}
}
public boolean isEnded() {
return ended;
}
public void terminate() {
sendEnd();
context.destroySocket(publisher);
JSONObject request = Messages.createTerminatePublisherRequest(This.getId(), name);
JSONObject response = This.getCom().requestJSON(request);
int value = JSON.getInt(response, Messages.RequestResponse.VALUE);
if (value == -1) {
System.err.println("Cannot terminate publisher");
}
}
private Zmq.Msg responseToSyncRequest() {
// send a dummy SYNC message by the publisher socket
publisher.sendMore(Messages.Event.SYNC);
publisher.send(Messages.Event.SYNC);
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToSubscribeRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToCancelRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
private Zmq.Msg responseToUnknownRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(-1, "Unknown request")));
return message;
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment