Commit 260fa990 authored by legoc's avatar legoc
Browse files

Replaced RequesterImpl tryRequest calls by request socket

parent e2ba1d5e
...@@ -41,7 +41,9 @@ public class RequesterImpl { ...@@ -41,7 +41,9 @@ public class RequesterImpl {
private int requesterId; private int requesterId;
private static AtomicInteger requesterCounter = new AtomicInteger(); private static AtomicInteger requesterCounter = new AtomicInteger();
private Zmq.Socket requester = null; private Zmq.Socket requester;
private RequestSocket requestSocket;
private boolean canceled = false; private boolean canceled = false;
private RequesterWaitingImpl waiting = new RequesterWaitingImpl(this); private RequesterWaitingImpl waiting = new RequesterWaitingImpl(this);
...@@ -54,7 +56,10 @@ public class RequesterImpl { ...@@ -54,7 +56,10 @@ public class RequesterImpl {
this.responderId = responderId; this.responderId = responderId;
this.requesterId = requesterId; this.requesterId = requesterId;
// create a socket REP // Create the REQ socket.
requestSocket = application.createRequestSocket(responderEndpoint);
// Create the REP socket.
requester = context.createSocket(Zmq.REP); requester = context.createSocket(Zmq.REP);
requester.bind("tcp://*:" + requesterPort); requester.bind("tcp://*:" + requesterPort);
...@@ -86,7 +91,7 @@ public class RequesterImpl { ...@@ -86,7 +91,7 @@ public class RequesterImpl {
.build(); .build();
requestMessage.add(command.toByteArray()); requestMessage.add(command.toByteArray());
application.tryRequest(requestMessage, responderEndpoint); requestSocket.request(requestMessage);
} }
private void send(ByteString request1, ByteString request2) { private void send(ByteString request1, ByteString request2) {
...@@ -103,7 +108,7 @@ public class RequesterImpl { ...@@ -103,7 +108,7 @@ public class RequesterImpl {
.build(); .build();
requestMessage.add(command.toByteArray()); requestMessage.add(command.toByteArray());
application.tryRequest(requestMessage, responderEndpoint); requestSocket.request(requestMessage);
} }
public void send(byte[] request) { public void send(byte[] request) {
...@@ -193,6 +198,10 @@ public class RequesterImpl { ...@@ -193,6 +198,10 @@ public class RequesterImpl {
public void terminate() { public void terminate() {
waiting.remove(); waiting.remove();
// Terminate the request socket.
requestSocket.terminate();
context.destroySocket(requester); context.destroySocket(requester);
try { try {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment