Commit d0530a6e authored by legoc's avatar legoc
Browse files

Removed the ServicesImpl.tryRequest methods

parent 53ee8bf1
......@@ -27,7 +27,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
<dependency>
<groupId>fr.ill.ics</groupId>
......
......@@ -24,7 +24,6 @@ import fr.ill.ics.cameo.proto.Messages.MessageType.Type;
public class RequestImpl {
private ApplicationImpl application;
Zmq.Context context;
private String requesterEndpoint;
private ByteString message;
private ByteString message2;
......@@ -32,10 +31,9 @@ public class RequestImpl {
private int requesterApplicationId;
private String requesterServerEndpoint;
public RequestImpl(ApplicationImpl application, Zmq.Context context, String requesterApplicationName, int requesterApplicationId, ByteString message, String serverUrl, int serverPort, int requesterPort) {
public RequestImpl(ApplicationImpl application, String requesterApplicationName, int requesterApplicationId, ByteString message, String serverUrl, int serverPort, int requesterPort) {
this.application = application;
this.context = context;
this.requesterEndpoint = serverUrl + ":" + requesterPort;
this.message = message;
......@@ -72,8 +70,11 @@ public class RequestImpl {
Zmq.Msg responseMessage = application.createRequest(Type.RESPONSE);
responseMessage.add(response);
application.tryRequest(responseMessage, requesterEndpoint);
// Create a new socket.
RequestSocket requestSocket = application.createRequestSocket(requesterEndpoint);
requestSocket.request(responseMessage);
requestSocket.terminate();
}
public void reply(String response) {
......
package fr.ill.ics.cameo.impl;
import fr.ill.ics.cameo.ConnectionTimeout;
import fr.ill.ics.cameo.SocketException;
import fr.ill.ics.cameo.Zmq;
public class RequestSocket {
......@@ -9,21 +10,34 @@ public class RequestSocket {
private Zmq.Socket socket;
private int timeout = 0;
public RequestSocket(Zmq.Context context, Zmq.Socket socket, int timeout) {
public RequestSocket(Zmq.Context context, int timeout) {
this.context = context;
this.socket = socket;
this.socket = context.createSocket(Zmq.REQ);
this.timeout = timeout;
}
public RequestSocket(Zmq.Context context, Zmq.Socket socket) {
public RequestSocket(Zmq.Context context) {
this.context = context;
this.socket = socket;
this.socket = context.createSocket(Zmq.REQ);
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public void connect(String endpoint) {
try {
boolean result = socket.connect(endpoint);
if (!result) {
throw new SocketException("Cannot connect socket to " + endpoint);
}
}
catch (Exception e) {
throw new SocketException(e.getMessage());
}
}
public Zmq.Msg request(Zmq.Msg request, int overrideTimeout) throws ConnectionTimeout {
// send request, wait safely for reply
......
......@@ -31,7 +31,8 @@ public class ResponderImpl {
Zmq.Context context;
private int responderPort;
private String name;
private Zmq.Socket responder = null;
private Zmq.Socket responder;
private boolean ended = false;
private boolean canceled = false;
private ResponderWaitingImpl waiting = new ResponderWaitingImpl(this);
......@@ -87,7 +88,7 @@ public class ResponderImpl {
fr.ill.ics.cameo.proto.Messages.Request request = fr.ill.ics.cameo.proto.Messages.Request.parseFrom(messageData);
// Create the request
RequestImpl impl = new RequestImpl(application, context,
RequestImpl impl = new RequestImpl(application,
request.getApplicationName(),
request.getApplicationId(),
request.getMessage(),
......
......@@ -183,90 +183,15 @@ public class ServicesImpl {
return new EventStreamSocket(context, subscriber, cancelPublisher);
}
/**
* send request
*
* @param request
* @return reply
* @throws ConnectionTimeout
* @throws SocketException
*/
protected Zmq.Msg tryRequest(Zmq.Msg request, String endpoint, int overrideTimeout) throws ConnectionTimeout, SocketException {
Zmq.Socket socket = context.createSocket(Zmq.REQ);
try {
try {
socket.connect(endpoint);
}
catch (Exception e) {
throw new SocketException(e.getMessage());
}
// send request, wait safely for reply
Zmq.Msg msg = request.duplicate();
msg.send(socket);
int usedTimeout = timeout;
if (overrideTimeout > -1) {
usedTimeout = overrideTimeout;
}
if (usedTimeout > 0) {
Zmq.Poller poller = context.createPoller(socket);
Zmq.Msg reply = null;
if (poller.poll(usedTimeout)) {
reply = Zmq.Msg.recvMsg(socket);
}
else {
throw new ConnectionTimeout();
}
return reply;
} else {
// direct receive
Zmq.Msg reply = Zmq.Msg.recvMsg(socket);
return reply;
}
} finally {
// it is better to call destroySocket rather than socket.close()
// it is really important to destroy the socket because Java will do it later
// with the garbage collector
context.destroySocket(socket);
}
}
protected Zmq.Msg tryRequest(Zmq.Msg request, String endpoint) throws ConnectionTimeout {
return tryRequest(request, endpoint, -1);
}
protected Zmq.Msg tryRequest(Zmq.Msg request, int overrideTimeout) throws ConnectionTimeout {
return tryRequest(request, serverEndpoint, overrideTimeout);
}
protected Zmq.Msg tryRequest(Zmq.Msg request) throws ConnectionTimeout {
return tryRequest(request, serverEndpoint, -1);
}
protected RequestSocket createRequestSocket(String endpoint) throws SocketException {
Zmq.Socket socket = context.createSocket(Zmq.REQ);
try {
socket.connect(endpoint);
}
catch (Exception e) {
throw new SocketException(e.getMessage());
}
RequestSocket requestSocket = new RequestSocket(context, timeout);
requestSocket.connect(endpoint);
return new RequestSocket(context, socket, timeout);
return requestSocket;
}
/**
*
* @param type
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
<properties>
<project.scm.id>ill-code</project.scm.id>
......
......@@ -81,8 +81,8 @@ public class Zmq {
socket.send(data, flags);
}
public void connect(String address) {
socket.connect(address);
public boolean connect(String address) {
return socket.connect(address);
}
public void subscribe(String topic) {
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
<properties>
<project.scm.id>ill-code</project.scm.id>
......
......@@ -82,8 +82,9 @@ public class Zmq {
socket.send(data, flags);
}
public void connect(String address) {
public boolean connect(String address) {
socket.connect(address);
return true;
}
public void subscribe(String topic) {
......
......@@ -24,7 +24,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
</dependencies>
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-console</artifactId>
<version>0.1.6</version>
<version>0.1.7</version>
<packaging>jar</packaging>
<properties>
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-server-jzmq</artifactId>
<version>0.1.5</version>
<version>0.1.6</version>
<packaging>jar</packaging>
<dependencies>
......@@ -24,7 +24,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jzmq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
</dependencies>
......
......@@ -6,7 +6,7 @@
<version>0.0.1</version>
</parent>
<artifactId>cameo-server</artifactId>
<version>0.1.5</version>
<version>0.1.6</version>
<packaging>jar</packaging>
<properties>
......@@ -32,7 +32,7 @@
<dependency>
<groupId>fr.ill.ics</groupId>
<artifactId>cameo-com-jeromq</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>
</dependency>
<dependency>
<groupId>fr.ill.ics</groupId>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment