Commit ca6009fd authored by legoc's avatar legoc
Browse files

Request refactored

parent 68d0a245
......@@ -2,9 +2,13 @@ package fr.ill.ics.cameo.coms;
import java.util.List;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.Instance;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.Server;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.messages.Messages;
/**
* Class Request.
......@@ -12,50 +16,71 @@ import fr.ill.ics.cameo.coms.impl.RequestImpl;
*/
public class Request {
private RequestImpl impl;
private Server requesterServer = null;
Request(RequestImpl impl) {
this.impl = impl;
private String requesterEndpoint;
private byte[] messagePart1;
private byte[] messagePart2;
private String requesterApplicationName;
private int requesterApplicationId;
private String requesterServerEndpoint;
public Request(String requesterApplicationName, int requesterApplicationId, String serverUrl, int serverPort, int requesterPort, byte[] messagePart1, byte[] messagePart2) {
this.requesterEndpoint = serverUrl + ":" + requesterPort;
this.messagePart1 = messagePart1;
this.messagePart2 = messagePart2;
this.requesterApplicationName = requesterApplicationName;
this.requesterApplicationId = requesterApplicationId;
this.requesterServerEndpoint = serverUrl + ":" + serverPort;
}
public byte[] getBinary() {
return impl.get();
return messagePart1;
}
public String get() {
return impl.getString();
return Messages.parseString(messagePart1);
}
public byte[][] getTwoBinaryParts() {
byte[][] result = new byte[2][];
result[0] = impl.get();
result[1] = impl.get2();
result[0] = messagePart1;
result[1] = messagePart2;
return result;
}
public void reply(byte[] response) {
impl.reply(response);
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.RESPONSE);
// Create a new socket.
// Notice that trying to reuse a socket by calling connect() does not work (it is worse with jeromq)
RequestSocket requestSocket = This.getCom().createRequestSocket(requesterEndpoint);
requestSocket.request(Messages.serialize(request), response);
requestSocket.terminate();
}
public void reply(String response) {
impl.reply(response);
reply(Messages.serialize(response));
}
public Instance connectToRequester() {
// Instantiate the requester server if it is null.
if (requesterServer == null) {
requesterServer = new Server(impl.getRequesterServerEndpoint());
requesterServer = new Server(requesterServerEndpoint);
}
// Connect and find the instance.
List<Instance> instances = requesterServer.connectAll(impl.getRequesterApplicationName());
List<Instance> instances = requesterServer.connectAll(requesterApplicationName);
for (Instance instance : instances) {
if (instance.getId() == impl.getRequesterApplicationId()) {
if (instance.getId() == requesterApplicationId) {
return instance;
}
}
......@@ -83,9 +108,12 @@ public class Request {
requesterServer.terminate();
}
}
@Override
public String toString() {
return impl.toString();
return "Request [endpoint=" + requesterEndpoint + ", id=" + requesterApplicationId + "]";
}
}
\ No newline at end of file
......@@ -3,7 +3,6 @@ package fr.ill.ics.cameo.coms;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.coms.impl.zmq.ResponderZmq;
import fr.ill.ics.cameo.messages.JSON;
......@@ -60,11 +59,7 @@ public class Responder {
}
public Request receive() {
RequestImpl requestImpl = impl.receive();
if (requestImpl == null) {
return null;
}
return new Request(requestImpl);
return impl.receive();
}
public void cancel() {
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
package fr.ill.ics.cameo.coms.impl;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.messages.Messages;
public class RequestImpl {
private String requesterEndpoint;
private byte[] message;
private byte[] message2;
private String requesterApplicationName;
private int requesterApplicationId;
private String requesterServerEndpoint;
public RequestImpl(String requesterApplicationName, int requesterApplicationId, byte[] message, String serverUrl, int serverPort, int requesterPort) {
this.requesterEndpoint = serverUrl + ":" + requesterPort;
this.message = message;
this.requesterApplicationName = requesterApplicationName;
this.requesterApplicationId = requesterApplicationId;
this.requesterServerEndpoint = serverUrl + ":" + serverPort;
}
public void setMessage2(byte[] message2) {
this.message2 = message2;
}
public byte[] get() {
return message;
}
public byte[] get2() {
return message2;
}
public String getString() {
return Messages.parseString(message);
}
public void reply(byte[] response) {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.RESPONSE);
// Create a new socket.
// Notice that trying to reuse a socket by calling connect() does not work (it is worse with jeromq)
RequestSocket requestSocket = This.getCom().createRequestSocket(requesterEndpoint);
requestSocket.request(Messages.serialize(request), response);
requestSocket.terminate();
}
public void reply(String response) {
reply(Messages.serialize(response));
}
public String getRequesterApplicationName() {
return requesterApplicationName;
}
public int getRequesterApplicationId() {
return requesterApplicationId;
}
public String getRequesterServerEndpoint() {
return requesterServerEndpoint;
}
@Override
public String toString() {
return "Request [endpoint=" + requesterEndpoint + ", id=" + requesterApplicationId + "]";
}
}
\ No newline at end of file
......@@ -16,12 +16,14 @@
package fr.ill.ics.cameo.coms.impl;
import fr.ill.ics.cameo.coms.Request;
public interface ResponderImpl {
public static final String RESPONDER_PREFIX = "rep.";
void init(int responderPort, String name);
RequestImpl receive();
Request receive();
void cancel();
boolean isEnded();
boolean isCanceled();
......
......@@ -22,7 +22,7 @@ import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.This;
import fr.ill.ics.cameo.base.impl.zmq.ContextZmq;
import fr.ill.ics.cameo.coms.impl.RequestImpl;
import fr.ill.ics.cameo.coms.Request;
import fr.ill.ics.cameo.coms.impl.ResponderImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
......@@ -49,7 +49,7 @@ public class ResponderZmq implements ResponderImpl {
responder.bind("tcp://*:" + responderPort);
}
public RequestImpl receive() {
public Request receive() {
Zmq.Msg message = null;
Zmq.Msg reply = null;
......@@ -78,23 +78,14 @@ public class ResponderZmq implements ResponderImpl {
int requesterPort = JSON.getInt(request, Messages.Request.REQUESTER_PORT);
byte[][] data = message.getAllData();
byte[] message1 = data[1];
// Create the request implementation.
RequestImpl impl = new RequestImpl(name,
id,
message1,
serverUrl,
serverPort,
requesterPort);
// Set the optional message 2.
byte[] messagePart1 = data[1];
byte[] messagePart2 = null;
if (data.length > 2) {
impl.setMessage2(data[2]);
messagePart2 = data[2];
}
return impl;
// Return the request.
return new Request(name, id, serverUrl, serverPort, requesterPort, messagePart1, messagePart2);
}
else if (type == Messages.CANCEL) {
canceled = true;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment