Commit 35574d52 authored by legoc's avatar legoc
Browse files

Use of requestJSON()

parent f14c7fa5
......@@ -84,15 +84,6 @@ public class RequestSocket {
return request(request, -1);
}
//TODO Remove when possible
public void request(JSONObject request) throws ConnectionTimeout {
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(request));
request(message, -1);
}
public JSONObject requestJSON(JSONObject request, int timeout) throws ConnectionTimeout {
Zmq.Msg message = new Zmq.Msg();
......
......@@ -100,11 +100,10 @@ public class PublisherImpl {
else if (type == Messages.CANCEL) {
canceled = true;
counter = numberOfSubscribers;
message.send(synchronizer);
reply = processCancelPublisherRequest();
}
else {
System.err.println("Unknown message type " + type);
message.send(synchronizer);
reply = processUnknownRequest();
}
// send to the client
......@@ -142,8 +141,8 @@ public class PublisherImpl {
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.request(request);
JSONObject response = requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
......@@ -201,14 +200,19 @@ public class PublisherImpl {
}
private Zmq.Msg processSyncRequest() {
// send a dummy SYNC message by the publisher socket
publisher.sendMore(Messages.Event.SYNC);
publisher.send(Messages.Event.SYNC);
Zmq.Msg reply = new Zmq.Msg();
reply.add("Connection OK");
JSONObject response = new JSONObject();
response.put(Messages.RequestResponse.VALUE, 0);
response.put(Messages.RequestResponse.MESSAGE, "OK");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(response));
return reply;
return message;
}
private Zmq.Msg processSubscribePublisherRequest() {
......@@ -224,6 +228,31 @@ public class PublisherImpl {
return message;
}
private Zmq.Msg processCancelPublisherRequest() {
JSONObject response = new JSONObject();
response.put(Messages.RequestResponse.VALUE, 0);
response.put(Messages.RequestResponse.MESSAGE, "OK");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(response));
return message;
}
private Zmq.Msg processUnknownRequest() {
JSONObject response = new JSONObject();
response.put(Messages.RequestResponse.VALUE, -1);
response.put(Messages.RequestResponse.MESSAGE, "Unknown request");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(response));
return message;
}
@Override
public String toString() {
return "pub." + name + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
......
......@@ -156,8 +156,7 @@ public class RequesterImpl {
}
// Send to the responder
Zmq.Msg reply = new Zmq.Msg();
reply.add("OK");
Zmq.Msg reply = processReplyRequest();
reply.send(requester);
}
}
......@@ -175,7 +174,7 @@ public class RequesterImpl {
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.request(request);
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
......@@ -202,6 +201,19 @@ public class RequesterImpl {
}
}
private Zmq.Msg processReplyRequest() {
// Return the reply.
JSONObject response = new JSONObject();
response.put(Messages.RequestResponse.VALUE, 0);
response.put(Messages.RequestResponse.MESSAGE, "OK");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(response));
return message;
}
@Override
public String toString() {
return REQUESTER_PREFIX + name + "." + requesterId + ":" + This.getName() + "." + This.getId() + "@" + This.getEndpoint();
......
......@@ -117,8 +117,7 @@ public class ResponderImpl {
}
// Send to the requester
reply = new Zmq.Msg();
reply.add("OK");
reply = processReplyRequest();
reply.send(responder);
if (reply != null) {
......@@ -137,12 +136,25 @@ public class ResponderImpl {
// Create the request socket. We can create it here because it should be called only once.
RequestSocket requestSocket = This.getCom().createRequestSocket(endpoint.toString());
requestSocket.request(request);
requestSocket.requestJSON(request);
// Terminate the socket.
requestSocket.terminate();
}
private Zmq.Msg processReplyRequest() {
// Return the reply.
JSONObject response = new JSONObject();
response.put(Messages.RequestResponse.VALUE, 0);
response.put(Messages.RequestResponse.MESSAGE, "OK");
Zmq.Msg message = new Zmq.Msg();
message.add(Messages.serialize(response));
return message;
}
public boolean isEnded() {
return ended;
}
......
......@@ -98,7 +98,7 @@ public class SubscriberImpl {
// The subscriber sends init messages to the publisher that returns SYNC message
try {
requestSocket.request(Messages.createSyncRequest());
requestSocket.requestJSON(Messages.createSyncRequest());
} catch (ConnectionTimeout e) {
// do nothing
......@@ -113,7 +113,7 @@ public class SubscriberImpl {
// The subscriber is connected and ready to receive data.
// Notify the publisher that it can send data.
JSONObject request = Messages.createSubscribePublisherRequest();
requestSocket.request(request);
requestSocket.requestJSON(request);
requestSocket.terminate();
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment