Commit 68d0a245 authored by legoc's avatar legoc
Browse files

Refactored Responder

parent 2d794da2
......@@ -5,6 +5,7 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.coms.impl.zmq.ResponderZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -13,14 +14,19 @@ import fr.ill.ics.cameo.messages.Messages;
*
*/
public class Responder {
private String name;
private ResponderImpl impl;
private ResponderWaiting waiting = new ResponderWaiting(this);
private Responder(ResponderImpl impl) {
this.impl = impl;
private Responder(String name) {
this.name = name;
//TODO Replace with factory.
this.impl = new ResponderZmq();
waiting.add();
}
private static ResponderImpl createResponder(String name) throws ResponderCreationException {
private void init(String name) throws ResponderCreationException {
String portName = ResponderImpl.RESPONDER_PREFIX + name;
JSONObject request = Messages.createRequestPortV0Request(This.getId(), portName);
......@@ -32,7 +38,7 @@ public class Responder {
throw new ResponderCreationException(JSON.getString(response, Messages.RequestResponse.MESSAGE));
}
return new ResponderImpl(port, name);
impl.init(port, name);
}
/**
......@@ -42,11 +48,15 @@ public class Responder {
* @throws ResponderCreationException, ConnectionTimeout
*/
static public Responder create(String name) throws ResponderCreationException {
return new Responder(createResponder(name));
Responder responder = new Responder(name);
responder.init(name);
return responder;
}
public String getName() {
return impl.getName();
return name;
}
public Request receive() {
......@@ -70,11 +80,12 @@ public class Responder {
}
public void terminate() {
waiting.remove();
impl.terminate();
}
@Override
public String toString() {
return impl.toString();
return ResponderImpl.RESPONDER_PREFIX + name + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
}
......@@ -14,15 +14,15 @@
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl;
package fr.ill.ics.cameo.coms;
import fr.ill.ics.cameo.base.Waiting;
public class ResponderWaitingImpl extends Waiting {
public class ResponderWaiting extends Waiting {
private ResponderImpl responder;
private Responder responder;
public ResponderWaitingImpl(ResponderImpl responder) {
public ResponderWaiting(Responder responder) {
this.responder = responder;
}
......
......@@ -16,162 +16,15 @@
package fr.ill.ics.cameo.coms.impl;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class ResponderImpl {
public interface ResponderImpl {
public static final String RESPONDER_PREFIX = "rep.";
private int responderPort;
private String name;
private Zmq.Context context;
private Zmq.Socket responder;
private boolean ended = false;
private boolean canceled = false;
private ResponderWaitingImpl waiting = new ResponderWaitingImpl(this);
public ResponderImpl(int responderPort, String name) {
this.responderPort = responderPort;
this.name = name;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// create a socket REP
responder = context.createSocket(Zmq.REP);
responder.bind("tcp://*:" + responderPort);
waiting.add();
}
public String getName() {
return name;
}
public RequestImpl receive() {
Zmq.Msg message = null;
Zmq.Msg reply = null;
try {
message = Zmq.Msg.recvMsg(responder);
if (message == null) {
ended = true;
return null;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.REQUEST) {
String name = JSON.getString(request, Messages.Request.APPLICATION_NAME);
int id = JSON.getInt(request, Messages.Request.APPLICATION_ID);
String serverUrl = JSON.getString(request, Messages.Request.SERVER_URL);
int serverPort = JSON.getInt(request, Messages.Request.SERVER_PORT);
int requesterPort = JSON.getInt(request, Messages.Request.REQUESTER_PORT);
byte[][] data = message.getAllData();
byte[] message1 = data[1];
// Create the request implementation.
RequestImpl impl = new RequestImpl(name,
id,
message1,
serverUrl,
serverPort,
requesterPort);
// Set the optional message 2.
if (data.length > 2) {
impl.setMessage2(data[2]);
}
return impl;
}
else if (type == Messages.CANCEL) {
canceled = true;
return null;
}
}
finally {
if (message != null) {
message.destroy();
}
// Send to the requester
reply = responseToRequest();
reply.send(responder);
if (reply != null) {
reply.destroy();
}
}
return null;
}
public void cancel() {
Endpoint endpoint = This.getEndpoint().withPort(responderPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
private Zmq.Msg responseToRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
public boolean isEnded() {
return ended;
}
public boolean isCanceled() {
return canceled;
}
public void terminate() {
waiting.remove();
context.destroySocket(responder);
try {
This.getCom().removePort(RESPONDER_PREFIX + name);
} catch (Exception e) {
System.err.println("Cannot terminate responder: " + e.getMessage());
}
}
@Override
public String toString() {
return RESPONDER_PREFIX + name + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
void init(int responderPort, String name);
RequestImpl receive();
void cancel();
boolean isEnded();
boolean isCanceled();
void terminate();
}
\ No newline at end of file
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl.zmq;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class ResponderZmq implements ResponderImpl {
private int responderPort;
private String name;
private Zmq.Context context;
private Zmq.Socket responder;
private boolean ended = false;
private boolean canceled = false;
public void init(int responderPort, String name) {
this.responderPort = responderPort;
this.name = name;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// create a socket REP
responder = context.createSocket(Zmq.REP);
responder.bind("tcp://*:" + responderPort);
}
public RequestImpl receive() {
Zmq.Msg message = null;
Zmq.Msg reply = null;
try {
message = Zmq.Msg.recvMsg(responder);
if (message == null) {
ended = true;
return null;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.REQUEST) {
String name = JSON.getString(request, Messages.Request.APPLICATION_NAME);
int id = JSON.getInt(request, Messages.Request.APPLICATION_ID);
String serverUrl = JSON.getString(request, Messages.Request.SERVER_URL);
int serverPort = JSON.getInt(request, Messages.Request.SERVER_PORT);
int requesterPort = JSON.getInt(request, Messages.Request.REQUESTER_PORT);
byte[][] data = message.getAllData();
byte[] message1 = data[1];
// Create the request implementation.
RequestImpl impl = new RequestImpl(name,
id,
message1,
serverUrl,
serverPort,
requesterPort);
// Set the optional message 2.
if (data.length > 2) {
impl.setMessage2(data[2]);
}
return impl;
}
else if (type == Messages.CANCEL) {
canceled = true;
return null;
}
}
finally {
if (message != null) {
message.destroy();
}
// Send to the requester
reply = responseToRequest();
reply.send(responder);
if (reply != null) {
reply.destroy();
}
}
return null;
}
public void cancel() {
Endpoint endpoint = This.getEndpoint().withPort(responderPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
private Zmq.Msg responseToRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
public boolean isEnded() {
return ended;
}
public boolean isCanceled() {
return canceled;
}
public void terminate() {
context.destroySocket(responder);
try {
This.getCom().removePort(RESPONDER_PREFIX + name);
} catch (Exception e) {
System.err.println("Cannot terminate responder: " + e.getMessage());
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment