Commit cddf96d5 authored by legoc's avatar legoc
Browse files

Requester java reviewed

parent a0f9e19a
package fr.ill.ics.cameo.coms;
import java.util.concurrent.atomic.AtomicInteger;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.coms.impl.RequesterImpl;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.coms.impl.zmq.RequesterZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -15,19 +18,34 @@ import fr.ill.ics.cameo.messages.Messages;
*/
public class Requester {
private String name;
private int requesterId;
private int responderId;
private RequesterImpl impl;
private RequesterWaiting waiting = new RequesterWaiting(this);
private static AtomicInteger requesterCounter = new AtomicInteger();
private Requester(RequesterImpl impl) {
this.impl = impl;
private Requester(String name) {
this.name = name;
this.impl = new RequesterZmq();
waiting.add();
}
private static int newRequesterId() {
return requesterCounter.incrementAndGet();
}
private static String getRequesterPortName(String name, int responderId, int requesterId) {
return RequesterImpl.REQUESTER_PREFIX + name + "." + responderId + "." + requesterId;
}
private static RequesterImpl createRequester(Instance application, String name) throws RequesterCreationException {
private void init(Instance application, String name) throws RequesterCreationException {
int responderId = application.getId();
responderId = application.getId();
String responderPortName = ResponderImpl.RESPONDER_PREFIX + name;
int requesterId = RequesterImpl.newRequesterId();
String requesterPortName = RequesterImpl.getRequesterPortName(name, responderId, requesterId);
requesterId = newRequesterId();
String requesterPortName = getRequesterPortName(name, responderId, requesterId);
// First connect to the responder.
JSONObject request = Messages.createConnectPortV0Request(responderId, responderPortName);
......@@ -59,7 +77,7 @@ public class Requester {
throw new RequesterCreationException(JSON.getString(response, Messages.RequestResponse.MESSAGE));
}
return new RequesterImpl(application.getEndpoint(), requesterPort, responderPort, name, responderId, requesterId);
impl.init(application.getEndpoint(), requesterPort, responderPort, name);
}
/**
......@@ -69,7 +87,11 @@ public class Requester {
* @throws RequesterCreationException, ConnectionTimeout
*/
static public Requester create(Instance application, String name) throws RequesterCreationException {
return new Requester(createRequester(application, name));
Requester requester = new Requester(name);
requester.init(application, name);
return requester;
}
public String getName() {
......@@ -105,11 +127,19 @@ public class Requester {
}
public void terminate() {
waiting.remove();
impl.terminate();
try {
This.getCom().removePort(getRequesterPortName(name, responderId, requesterId));
} catch (Exception e) {
System.err.println("Cannot terminate requester: " + e.getMessage());
}
}
@Override
public String toString() {
return impl.toString();
return RequesterImpl.REQUESTER_PREFIX + name + "." + requesterId + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
}
\ No newline at end of file
......@@ -14,15 +14,15 @@
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl;
package fr.ill.ics.cameo.coms;
import fr.ill.ics.cameo.base.Waiting;
public class RequesterWaitingImpl extends Waiting {
public class RequesterWaiting extends Waiting {
private RequesterImpl requester;
private Requester requester;
public RequesterWaitingImpl(RequesterImpl requester) {
public RequesterWaiting(Requester requester) {
this.requester = requester;
}
......
......@@ -16,187 +16,21 @@
package fr.ill.ics.cameo.coms.impl;
import java.util.concurrent.atomic.AtomicInteger;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class RequesterImpl {
public interface RequesterImpl {
public static final String REQUESTER_PREFIX = "req.";
private int requesterPort;
private String name;
private int responderId;
// Need for a unique id per Application instance.
private int requesterId;
private static AtomicInteger requesterCounter = new AtomicInteger();
private Zmq.Context context;
private Zmq.Socket requester;
private RequestSocket requestSocket;
private boolean canceled = false;
private RequesterWaitingImpl waiting = new RequesterWaitingImpl(this);
public RequesterImpl(Endpoint endpoint, int requesterPort, int responderPort, String name, int responderId, int requesterId) {
this.requesterPort = requesterPort;
this.name = name;
this.responderId = responderId;
this.requesterId = requesterId;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// Create the REQ socket.
String responderEndpoint = endpoint.withPort(responderPort).toString();
requestSocket = This.getCom().createRequestSocket(responderEndpoint);
// Create the REP socket.
requester = context.createSocket(Zmq.REP);
requester.bind("tcp://*:" + requesterPort);
waiting.add();
}
public String getName() {
return name;
}
public static int newRequesterId() {
return requesterCounter.incrementAndGet();
}
public static String getRequesterPortName(String name, int responderId, int requesterId) {
return REQUESTER_PREFIX + name + "." + responderId + "." + requesterId;
}
public void send(byte[] requestData) {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.REQUEST);
request.put(Messages.Request.APPLICATION_NAME, This.getName());
request.put(Messages.Request.APPLICATION_ID, This.getId());
request.put(Messages.Request.SERVER_URL, This.getEndpoint().getProtocol() + "://" + This.getEndpoint().getAddress());
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
requestSocket.request(Messages.serialize(request), requestData);
}
public void send(String request) {
send(Messages.serialize(request));
}
public void sendTwoParts(byte[] requestData1, byte[] requestData2) {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.REQUEST);
request.put(Messages.Request.APPLICATION_NAME, This.getName());
request.put(Messages.Request.APPLICATION_ID, This.getId());
request.put(Messages.Request.SERVER_URL, This.getEndpoint().getProtocol() + "://" + This.getEndpoint().getAddress());
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
requestSocket.request(Messages.serialize(request), requestData1, requestData2);
}
public byte[] receive() {
Zmq.Msg message = null;
try {
message = Zmq.Msg.recvMsg(requester);
if (message == null) {
return null;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.RESPONSE) {
return message.getLastData();
}
else if (type == Messages.CANCEL) {
canceled = true;
return null;
}
else {
return null;
}
}
finally {
if (message != null) {
message.destroy();
}
// Send to the responder
Zmq.Msg reply = responseToRequest();
reply.send(requester);
}
}
public String receiveString() {
return Messages.parseString(receive());
}
public void cancel() {
Endpoint endpoint = This.getEndpoint().withPort(requesterPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
public boolean isCanceled() {
return canceled;
}
public void terminate() {
waiting.remove();
// Terminate the request socket.
requestSocket.terminate();
context.destroySocket(requester);
try {
This.getCom().removePort(getRequesterPortName(name, responderId, requesterId));
} catch (Exception e) {
System.err.println("Cannot terminate requester: " + e.getMessage());
}
}
private Zmq.Msg responseToRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
@Override
public String toString() {
return REQUESTER_PREFIX + name + "." + requesterId + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
}
void init(Endpoint endpoint, int requesterPort, int responderPort, String name);
String getName();
void send(byte[] requestData);
void send(String request);
void sendTwoParts(byte[] requestData1, byte[] requestData2);
byte[] receive();
String receiveString();
void cancel();
boolean isCanceled();
void terminate();
}
\ No newline at end of file
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl.zmq;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.coms.impl.RequesterImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
public class RequesterZmq implements RequesterImpl {
public static final String REQUESTER_PREFIX = "req.";
private int requesterPort;
private String name;
private Zmq.Context context;
private Zmq.Socket requester;
private RequestSocket requestSocket;
private boolean canceled = false;
public void init(Endpoint endpoint, int requesterPort, int responderPort, String name) {
this.requesterPort = requesterPort;
this.name = name;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// Create the REQ socket.
String responderEndpoint = endpoint.withPort(responderPort).toString();
requestSocket = This.getCom().createRequestSocket(responderEndpoint);
// Create the REP socket.
requester = context.createSocket(Zmq.REP);
requester.bind("tcp://*:" + requesterPort);
}
public String getName() {
return name;
}
public void send(byte[] requestData) {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.REQUEST);
request.put(Messages.Request.APPLICATION_NAME, This.getName());
request.put(Messages.Request.APPLICATION_ID, This.getId());
request.put(Messages.Request.SERVER_URL, This.getEndpoint().getProtocol() + "://" + This.getEndpoint().getAddress());
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
requestSocket.request(Messages.serialize(request), requestData);
}
public void send(String request) {
send(Messages.serialize(request));
}
public void sendTwoParts(byte[] requestData1, byte[] requestData2) {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.REQUEST);
request.put(Messages.Request.APPLICATION_NAME, This.getName());
request.put(Messages.Request.APPLICATION_ID, This.getId());
request.put(Messages.Request.SERVER_URL, This.getEndpoint().getProtocol() + "://" + This.getEndpoint().getAddress());
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
requestSocket.request(Messages.serialize(request), requestData1, requestData2);
}
public byte[] receive() {
Zmq.Msg message = null;
try {
message = Zmq.Msg.recvMsg(requester);
if (message == null) {
return null;
}
// Get the JSON request object.
JSONObject request = This.getCom().parse(message.getFirstData());
// Get the type.
long type = JSON.getLong(request, Messages.TYPE);
if (type == Messages.RESPONSE) {
return message.getLastData();
}
else if (type == Messages.CANCEL) {
canceled = true;
return null;
}
else {
return null;
}
}
finally {
if (message != null) {
message.destroy();
}
// Send to the responder
Zmq.Msg reply = responseToRequest();
reply.send(requester);
}
}
public String receiveString() {
return Messages.parseString(receive());
}
public void cancel() {
Endpoint endpoint = This.getEndpoint().withPort(requesterPort);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.CANCEL);
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
public boolean isCanceled() {
return canceled;
}
public void terminate() {
// Terminate the request socket.
requestSocket.terminate();
context.destroySocket(requester);
}
private Zmq.Msg responseToRequest() {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(Messages.createRequestResponse(0, "OK")));
return message;
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment