Commit c169861e authored by legoc's avatar legoc
Browse files

Replaced ServicesImpl.tryRequest calls to cameo server by a request socket

parent 0dbcc874
......@@ -90,9 +90,6 @@ public class ApplicationImpl extends ServicesImpl {
throw new InvalidArgumentException(info + " is not a valid argument");
}
// Init the services part.
init();
url = tokens[0] + ":" + tokens[1];
port = Integer.parseInt(tokens[2]);
......@@ -100,6 +97,9 @@ public class ApplicationImpl extends ServicesImpl {
// but that generates troubles when two applications communicate remotely.
// However leave the same value seems to be ok.
serverEndpoint = url + ":" + port;
// Init the context and socket.
init();
// Analyze 4th token that can be either the name.id or the name in case of unmanaged application.
String nameId = tokens[3];
......@@ -183,7 +183,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createSetResultRequest(id, data);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = RequestResponse.parseFrom(messageData);
......@@ -226,7 +226,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createSetStatusRequest(id, Application.State.RUNNING);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = RequestResponse.parseFrom(messageData);
......@@ -249,7 +249,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createStartedUnmanagedRequest(name, pid);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = RequestResponse.parseFrom(messageData);
......@@ -265,7 +265,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createTerminatedUnmanagedRequest(id);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = RequestResponse.parseFrom(messageData);
......@@ -284,7 +284,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createGetStatusRequest(id);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
if (messageData == null) {
......@@ -376,7 +376,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createCreatePublisherRequest(id, name, numberOfSubscribers);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
PublisherResponse requestResponse = null;
......@@ -415,7 +415,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createTerminatePublisherRequest(id, name);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -444,7 +444,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createRequestPortRequest(id, portName);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -503,7 +503,7 @@ public class ApplicationImpl extends ServicesImpl {
// Request a requester port
request = createRequestPortRequest(id, requesterPortName);
reply = tryRequest(request);
reply = requestSocket.request(request);
messageData = reply.getFirstData();
requestResponse = RequestResponse.parseFrom(messageData);
......@@ -524,7 +524,7 @@ public class ApplicationImpl extends ServicesImpl {
Zmq.Msg request = createRemovePortRequest(id, name);
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......
......@@ -7,13 +7,22 @@ public class RequestSocket {
private Zmq.Context context;
private Zmq.Socket socket;
private int timeout;
private int timeout = 0;
public RequestSocket(Zmq.Context context, Zmq.Socket socket, int timeout) {
this.context = context;
this.socket = socket;
this.timeout = timeout;
}
public RequestSocket(Zmq.Context context, Zmq.Socket socket) {
this.context = context;
this.socket = socket;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public Zmq.Msg request(Zmq.Msg request, int overrideTimeout) throws ConnectionTimeout {
......@@ -28,18 +37,6 @@ public class RequestSocket {
if (usedTimeout > 0) {
// PollItem[] items = { new PollItem(socket, ZMQ.Poller.POLLIN) };
// ZMQ.poll(items, usedTimeout);
// Zmq.Msg reply = null;
//
// // in case a response is returned before timeout
// if (items[0].isReadable()) {
// reply = Zmq.Msg.recvMsg(socket);
//
// } else {
// throw new ConnectionTimeout();
// }
Zmq.Poller poller = context.createPoller(socket);
Zmq.Msg reply = null;
if (poller.poll(usedTimeout)) {
......
......@@ -89,6 +89,7 @@ public class ServerImpl extends ServicesImpl {
port = Integer.parseInt(tokens[2]);
serverEndpoint = url + ":" + port;
// Init the context and socket.
init();
// start the status thread if it is possible.
......@@ -162,7 +163,8 @@ public class ServerImpl extends ServicesImpl {
private fr.ill.ics.cameo.impl.Response startApplication(String name, String[] args, String instanceReference) throws ConnectionTimeout {
Zmq.Msg request = createStartRequest(name, args, instanceReference);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -178,7 +180,7 @@ public class ServerImpl extends ServicesImpl {
private int getStreamPort(String name) throws ConnectionTimeout {
Zmq.Msg request = createOutputRequest(name);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -294,7 +296,7 @@ public class ServerImpl extends ServicesImpl {
request = createStopRequest(id);
}
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse response = null;
......@@ -331,7 +333,7 @@ public class ServerImpl extends ServicesImpl {
List<InstanceImpl> instances = new ArrayList<InstanceImpl>();
Zmq.Msg request = createConnectRequest(name);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
try {
ApplicationInfoListResponse response = ApplicationInfoListResponse.parseFrom(reply.getFirstData());
......@@ -413,7 +415,7 @@ public class ServerImpl extends ServicesImpl {
public List<Application.Configuration> getApplicationConfigurations() {
Zmq.Msg request = createAllAvailableRequest();
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
LinkedList<Application.Configuration> applications = new LinkedList<Application.Configuration>();
try {
......@@ -443,7 +445,7 @@ public class ServerImpl extends ServicesImpl {
public List<Application.Info> getApplicationInfos() {
Zmq.Msg request = createShowAllRequest();
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
LinkedList<Application.Info> applications = new LinkedList<Application.Info>();
try {
......@@ -521,7 +523,7 @@ public class ServerImpl extends ServicesImpl {
public OutputStreamSocket openOutputStream(int id) throws OutputStreamException {
Zmq.Msg request = createShowStreamRequest(id);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -552,7 +554,7 @@ public class ServerImpl extends ServicesImpl {
private boolean isAlive(int id) {
Zmq.Msg request = createIsAliveRequest(id);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
IsAliveResponse requestResponse = null;
byte[] messageData = reply.getFirstData();
......@@ -578,7 +580,7 @@ public class ServerImpl extends ServicesImpl {
public void writeToInputStream(int id, String[] parametersArray) throws WriteException {
Zmq.Msg request = createSendParametersRequest(id, parametersArray);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
RequestResponse requestResponse = null;
......@@ -622,7 +624,7 @@ public class ServerImpl extends ServicesImpl {
public SubscriberImpl createSubscriber(int applicationId, String publisherName, InstanceImpl instance) throws SubscriberCreationException {
Zmq.Msg request = createConnectPublisherRequest(applicationId, publisherName);
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
PublisherResponse requestResponse = null;
......
......@@ -39,6 +39,7 @@ public class ServicesImpl {
protected int statusPort;
protected Zmq.Context context;
protected int timeout = 0; // default value because of ZeroMQ design
protected RequestSocket requestSocket;
protected static final String STREAM = "STREAM";
protected static final String ENDSTREAM = "ENDSTREAM";
......@@ -48,8 +49,12 @@ public class ServicesImpl {
protected static final String PORT = "PORT";
protected static final String CANCEL = "CANCEL";
/**
* Initializes the context and the request socket. The serverEndpoint must have been set.
*/
final protected void init() {
this.context = new Zmq.Context();
this.requestSocket = this.createRequestSocket(serverEndpoint);
}
public int getTimeout() {
......@@ -77,6 +82,10 @@ public class ServicesImpl {
}
public void terminate() {
// Terminate the request socket.
requestSocket.terminate();
// destroying the context
context.destroy();
}
......@@ -91,7 +100,7 @@ public class ServicesImpl {
Zmq.Msg request = createInitRequest();
Zmq.Msg reply = null;
try {
reply = tryRequest(request, overrideTimeout);
reply = requestSocket.request(request, overrideTimeout);
reply.destroy();
request.destroy();
return true;
......@@ -133,8 +142,7 @@ public class ServicesImpl {
RequestResponse requestResponse = null;
try {
Zmq.Msg reply = tryRequest(request);
Zmq.Msg reply = requestSocket.request(request);
byte[] messageData = reply.getFirstData();
requestResponse = RequestResponse.parseFrom(messageData);
......@@ -211,18 +219,6 @@ public class ServicesImpl {
if (usedTimeout > 0) {
// PollItem[] items = { new PollItem(socket, ZMQ.Poller.POLLIN) };
// ZMQ.poll(items, usedTimeout);
// Zmq.Msg reply = null;
//
// // in case a response is returned before timeout
// if (items[0].isReadable()) {
// reply = Zmq.Msg.recvMsg(socket);
//
// } else {
// throw new ConnectionTimeout();
// }
Zmq.Poller poller = context.createPoller(socket);
Zmq.Msg reply = null;
if (poller.poll(usedTimeout)) {
......@@ -261,7 +257,7 @@ public class ServicesImpl {
return tryRequest(request, serverEndpoint, -1);
}
protected RequestSocket createSocket(String endpoint) throws SocketException {
protected RequestSocket createRequestSocket(String endpoint) throws SocketException {
Zmq.Socket socket = context.createSocket(Zmq.REQ);
......
......@@ -93,7 +93,7 @@ public class SubscriberImpl {
String endpoint = url + ":" + synchronizerPort;
// Create a socket that will be used for several requests.
RequestSocket requestSocket = server.createSocket(endpoint);
RequestSocket requestSocket = server.createRequestSocket(endpoint);
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment