Commit 512a452e authored by legoc's avatar legoc
Browse files

Reviewed RequestSocket and its impl

parent 6d07a797
......@@ -27,7 +27,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
<dependency>
<groupId>fr.ill.ics</groupId>
......
......@@ -25,7 +25,6 @@ import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.base.impl.InstanceImpl;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.base.impl.ServerImpl;
import fr.ill.ics.cameo.base.impl.ThisImpl;
import fr.ill.ics.cameo.messages.JSON;
......
package fr.ill.ics.cameo.base;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import fr.ill.ics.cameo.base.impl.RequestSocketImpl;
import fr.ill.ics.cameo.base.impl.zmq.RequestSocketZmq;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.messages.Messages;
public class RequestSocket {
private RequestSocketImpl socket;
private Parser parser;
public RequestSocket(Context context, int timeout, Parser parser) {
//TODO replace with a factory call.
this.socket = new RequestSocketZmq(context, timeout);
this.parser = parser;
}
public void setTimeout(int timeout) {
this.socket.setTimeout(timeout);
}
public void connect(String endpoint) {
socket.connect(endpoint);
}
public byte[][] request(byte[] part1) {
return socket.request(part1, -1);
}
public byte[][] request(byte[] part1, byte[] part2) {
return socket.request(part1, part2, -1);
}
public byte[][] request(byte[] part1, byte[] part2, byte[] part3) {
return socket.request(part1, part2, part3, -1);
}
public JSONObject requestJSON(JSONObject request, int timeout) throws ConnectionTimeout {
byte[][] reply = socket.request(Messages.serialize(request), timeout);
try {
return parser.parse(Messages.parseString(reply[0]));
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
}
}
public JSONObject requestJSON(JSONObject request) throws ConnectionTimeout {
return requestJSON(request, -1);
}
public JSONObject requestJSON(JSONObject request, byte[] data, int timeout) throws ConnectionTimeout {
byte[][] reply = socket.request(Messages.serialize(request), data, timeout);
try {
return parser.parse(Messages.parseString(reply[0]));
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
}
}
public JSONObject requestJSON(JSONObject request, byte[] data) throws ConnectionTimeout {
return requestJSON(request, data, -1);
}
public void terminate() {
socket.terminate();
}
}
package fr.ill.ics.cameo.base.impl;
public interface RequestSocketImpl {
void setTimeout(int timeout);
void connect(String endpoint);
byte[][] request(byte[] part1, int overrideTimeout);
byte[][] request(byte[] part1, byte[] part2, int overrideTimeout);
byte[][] request(byte[] part1, byte[] part2, byte[] part3, int overrideTimeout);
void terminate();
}
......@@ -39,6 +39,7 @@ import fr.ill.ics.cameo.base.EventStreamSocket;
import fr.ill.ics.cameo.base.InvalidArgumentException;
import fr.ill.ics.cameo.base.Option;
import fr.ill.ics.cameo.base.OutputStreamSocket;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.SocketException;
import fr.ill.ics.cameo.base.UndefinedApplicationException;
import fr.ill.ics.cameo.base.UndefinedKeyException;
......@@ -205,7 +206,7 @@ public class ServerImpl {
public RequestSocket createRequestSocket(String endpoint) throws SocketException {
RequestSocket requestSocket = new RequestSocket(context, timeout, parser);
RequestSocket requestSocket = new RequestSocket(contextImpl, timeout, parser);
requestSocket.connect(endpoint);
return requestSocket;
......
package fr.ill.ics.cameo.base.impl;
import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
package fr.ill.ics.cameo.base.impl.zmq;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.Context;
import fr.ill.ics.cameo.base.SocketException;
import fr.ill.ics.cameo.base.UnexpectedException;
import fr.ill.ics.cameo.messages.JSON.Parser;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.base.impl.ContextImpl;
import fr.ill.ics.cameo.base.impl.RequestSocketImpl;
public class RequestSocket {
public class RequestSocketZmq implements RequestSocketImpl {
private Zmq.Context context;
private Zmq.Socket socket;
private int timeout = 0;
private Parser parser;
public RequestSocket(Zmq.Context context, int timeout, Parser parser) {
this.context = context;
this.socket = context.createSocket(Zmq.REQ);
this.timeout = timeout;
this.parser = parser;
}
public RequestSocket(Zmq.Context context, int timeout) {
this.context = context;
this.socket = context.createSocket(Zmq.REQ);
public RequestSocketZmq(Context context, int timeout) {
// Get the Zmq context.
this.context = ((ContextImpl)context).getContext();
this.socket = this.context.createSocket(Zmq.REQ);
this.timeout = timeout;
this.parser = null;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
......@@ -48,6 +37,33 @@ public class RequestSocket {
}
}
public byte[][] request(byte[] part1, int overrideTimeout) {
Zmq.Msg message = new Zmq.Msg();
message.add(part1);
return request(message, overrideTimeout).getAllData();
}
public byte[][] request(byte[] part1, byte[] part2, int overrideTimeout) {
Zmq.Msg message = new Zmq.Msg();
message.add(part1);
message.add(part2);
return request(message, overrideTimeout).getAllData();
}
public byte[][] request(byte[] part1, byte[] part2, byte[] part3, int overrideTimeout) {
Zmq.Msg message = new Zmq.Msg();
message.add(part1);
message.add(part2);
message.add(part3);
return request(message, overrideTimeout).getAllData();
}
public Zmq.Msg request(Zmq.Msg request, int overrideTimeout) throws ConnectionTimeout {
// send request, wait safely for reply
......@@ -84,47 +100,6 @@ public class RequestSocket {
return request(request, -1);
}
public JSONObject requestJSON(JSONObject request, int timeout) throws ConnectionTimeout {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
Zmq.Msg reply = request(message, timeout);
try {
return parser.parse(Messages.parseString(reply.getFirstData()));
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
}
}
public JSONObject requestJSON(JSONObject request) throws ConnectionTimeout {
return requestJSON(request, -1);
}
public JSONObject requestJSON(JSONObject request, byte[] data, int timeout) throws ConnectionTimeout {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
// Add the data in a second frame.
message.add(data);
Zmq.Msg reply = request(message, timeout);
try {
return parser.parse(Messages.parseString(reply.getFirstData()));
}
catch (ParseException e) {
throw new UnexpectedException("Cannot parse response");
}
}
public JSONObject requestJSON(JSONObject request, byte[] data) throws ConnectionTimeout {
return requestJSON(request, data, -1);
}
public void terminate() {
// it is better to call destroySocket rather than socket.close()
// it is really important to destroy the socket because Java will do
......
......@@ -20,8 +20,8 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.impl.ContextImpl;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
......
......@@ -18,9 +18,8 @@ package fr.ill.ics.cameo.coms.impl;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.messages.Messages;
public class RequestImpl {
......@@ -63,17 +62,11 @@ public class RequestImpl {
JSONObject request = new JSONObject();
request.put(Messages.TYPE, Messages.RESPONSE);
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
// Set request in the next frame.
message.add(response);
// Create a new socket.
// Notice that trying to reuse a socket by calling connect() does not work (it is worse with jeromq)
RequestSocket requestSocket = This.getCom().createRequestSocket(requesterEndpoint);
requestSocket.request(message);
requestSocket.request(Messages.serialize(request), response);
requestSocket.terminate();
}
......
......@@ -22,9 +22,8 @@ import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.impl.ContextImpl;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.base.impl.ThisImpl;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
......@@ -33,7 +32,6 @@ public class RequesterImpl {
public static final String REQUESTER_PREFIX = "req.";
private ThisImpl application;
private int requesterPort;
private String name;
private int responderId;
......@@ -89,13 +87,7 @@ public class RequesterImpl {
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
// Set request in the next frame.
message.add(requestData);
requestSocket.request(message);
requestSocket.request(Messages.serialize(request), requestData);
}
public void send(String request) {
......@@ -112,14 +104,7 @@ public class RequesterImpl {
request.put(Messages.Request.SERVER_PORT, This.getEndpoint().getPort());
request.put(Messages.Request.REQUESTER_PORT, requesterPort);
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
// Set request1 and request2 in the next frames.
message.add(requestData1);
message.add(requestData2);
requestSocket.request(message);
requestSocket.request(Messages.serialize(request), requestData1, requestData2);
}
public byte[] receive() {
......
......@@ -16,14 +16,12 @@
package fr.ill.ics.cameo.coms.impl;
import java.util.List;
import org.json.simple.JSONObject;
import fr.ill.ics.cameo.Zmq;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.impl.ContextImpl;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
......@@ -86,9 +84,9 @@ public class ResponderImpl {
int serverPort = JSON.getInt(request, Messages.Request.SERVER_PORT);
int requesterPort = JSON.getInt(request, Messages.Request.REQUESTER_PORT);
List<byte[]> data = message.getAllData();
byte[][] data = message.getAllData();
byte[] message1 = data.get(1);
byte[] message1 = data[1]; // OR 1?
// Create the request implementation.
RequestImpl impl = new RequestImpl(name,
......@@ -99,8 +97,8 @@ public class ResponderImpl {
requesterPort);
// Set the optional message 2.
if (data.size() > 2) {
impl.setMessage2(data.get(2));
if (data.length > 2) {
impl.setMessage2(data[2]);
}
return impl;
......
......@@ -23,9 +23,9 @@ import fr.ill.ics.cameo.base.Application;
import fr.ill.ics.cameo.base.Application.Instance;
import fr.ill.ics.cameo.base.Application.This;
import fr.ill.ics.cameo.base.ConnectionTimeout;
import fr.ill.ics.cameo.base.RequestSocket;
import fr.ill.ics.cameo.base.impl.CancelIdGenerator;
import fr.ill.ics.cameo.base.impl.ContextImpl;
import fr.ill.ics.cameo.base.impl.RequestSocket;
import fr.ill.ics.cameo.messages.JSON;
import fr.ill.ics.cameo.messages.Messages;
import fr.ill.ics.cameo.strings.Endpoint;
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
<properties>
<project.scm.id>ill-code</project.scm.id>
......
......@@ -45,14 +45,15 @@ public class Zmq {
return message.getLast().getData();
}
public List<byte[]> getAllData() {
ArrayList<byte[]> result = new ArrayList<byte[]>();
public byte[][] getAllData() {
byte[][] result = new byte[message.size()][];
Iterator<ZFrame> iterator = message.iterator();
int i = 0;
while (iterator.hasNext()) {
ZFrame frame = iterator.next();
result.add(frame.getData());
result[i] = frame.getData();
++i;
}
return result;
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
<properties>
<project.scm.id>ill-code</project.scm.id>
......
......@@ -46,14 +46,15 @@ public class Zmq {
return message.getLast().getData();
}
public List<byte[]> getAllData() {
ArrayList<byte[]> result = new ArrayList<byte[]>();
public byte[][] getAllData() {
byte[][] result = new byte[message.size()][];
Iterator<ZFrame> iterator = message.iterator();
int i = 0;
while (iterator.hasNext()) {
ZFrame frame = iterator.next();
result.add(frame.getData());
result[i] = frame.getData();
++i;
}
return result;
......
......@@ -24,7 +24,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
</dependencies>
......
......@@ -24,7 +24,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
</dependencies>
......
......@@ -32,7 +32,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
</dependency>
<dependency>
<groupId>fr.ill.ics</groupId>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment