Commit a21c3b1a authored by legoc's avatar legoc
Browse files

Use request socket in Requester

parent 838921eb
......@@ -752,7 +752,7 @@ std::unique_ptr<Instance>& InstanceArray::operator[](std::size_t index) {
///////////////////////////////////////////////////////////////////////////////
// Publisher
Publisher::Publisher(const This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
Publisher::Publisher(This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
m_impl(new PublisherImpl(application, publisherPort, synchronizerPort, name, numberOfSubscribers)) {
// Create the waiting here.
......@@ -1061,7 +1061,7 @@ bool Responder::isCanceled() const {
///////////////////////////////////////////////////////////////////////////
// Requester
Requester::Requester(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
Requester::Requester(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
m_impl(new RequesterImpl(application, url, requesterPort, responderPort, name, responderId, requesterId)) {
// Create the waiting here.
......@@ -1077,14 +1077,18 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
string responderUrl = instance.getUrl();
string responderEndpoint = instance.getEndpoint();
// Create a request socket to the server of the instance.
unique_ptr<RequestSocketImpl> instanceRequestSocket = This::m_instance.createRequestSocket(responderEndpoint);
string responderPortName = ResponderImpl::RESPONDER_PREFIX + name;
int requesterId = RequesterImpl::newRequesterId();
string requesterPortName = RequesterImpl::getRequesterPortName(name, responderId, requesterId);
string strRequestType = This::m_instance.m_impl->createRequestType(PROTO_CONNECTPORT);
string strRequestData = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
string requestTypePart = This::m_instance.m_impl->createRequestType(PROTO_CONNECTPORT);
string requestDataPart = This::m_instance.m_impl->createConnectPortRequest(responderId, responderPortName);
unique_ptr<zmq::message_t> reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
unique_ptr<zmq::message_t> reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, responderEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
reply.reset();
......@@ -1095,7 +1099,8 @@ std::unique_ptr<Requester> Requester::create(Instance & instance, const std::str
instance.waitFor(0, responderPortName);
// Retry to connect.
reply = This::m_instance.m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, responderEndpoint);
reply = instanceRequestSocket->request(requestTypePart, requestDataPart);
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
responderPort = requestResponse.value();
......
......@@ -324,7 +324,7 @@ public:
bool isEnded() const;
private:
Publisher(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
Publisher(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
std::unique_ptr<PublisherImpl> m_impl;
std::unique_ptr<WaitingImpl> m_waiting;
......@@ -471,7 +471,7 @@ public:
bool isCanceled() const;
private:
Requester(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
Requester(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
std::unique_ptr<RequesterImpl> m_impl;
std::unique_ptr<WaitingImpl> m_waiting;
......
......@@ -18,6 +18,7 @@
#include "../Application.h"
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using namespace std;
......@@ -28,7 +29,7 @@ const std::string PublisherImpl::SYNC = "SYNC";
const std::string PublisherImpl::STREAM = "STREAM";
const std::string PublisherImpl::ENDSTREAM = "ENDSTREAM";
PublisherImpl::PublisherImpl(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
PublisherImpl::PublisherImpl(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
m_application(application),
m_publisherPort(publisherPort),
m_synchronizerPort(synchronizerPort),
......@@ -134,13 +135,14 @@ void PublisherImpl::cancelWaitForSubscribers() {
stringstream endpoint;
endpoint << m_application->getUrl() << ":" << (m_publisherPort + 1);
string strRequestType = m_application->m_impl->createRequestType(PROTO_CANCEL);
string strRequestData;
string requestDataPart;
proto::CancelPublisherSyncCommand cancelPublisherSyncCommand;
cancelPublisherSyncCommand.SerializeToString(&strRequestData);
cancelPublisherSyncCommand.SerializeToString(&requestDataPart);
unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
// Create a request socket only for the request.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
unique_ptr<zmq::message_t> reply = requestSocket->request(m_application->m_impl->createRequestType(PROTO_CANCEL), requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......
......@@ -32,7 +32,7 @@ namespace application {
class PublisherImpl {
public:
PublisherImpl(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
PublisherImpl(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
~PublisherImpl();
const std::string& getName() const;
......@@ -62,7 +62,7 @@ public:
zmq::message_t * processSubscribePublisherCommand();
zmq::message_t * processCancelPublisherSyncCommand();
const application::This * m_application;
application::This * m_application;
int m_publisherPort;
int m_synchronizerPort;
std::string m_name;
......
......@@ -18,6 +18,7 @@
#include "../Application.h"
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using namespace std;
......@@ -28,7 +29,7 @@ const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std::mutex RequesterImpl::m_mutex;
int RequesterImpl::m_requesterCounter = 0;
RequesterImpl::RequesterImpl(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
RequesterImpl::RequesterImpl(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
m_application(application),
m_requesterPort(requesterPort),
m_name(name),
......@@ -40,12 +41,15 @@ RequesterImpl::RequesterImpl(const application::This * application, const std::s
repEndpoint << url << ":" << responderPort;
m_responderEndpoint = repEndpoint.str();
// create a socket REP
m_requester.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
// Create the request socket.
m_requestSocket = m_application->createRequestSocket(m_responderEndpoint);
// Create a socket REP.
m_repSocket.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
stringstream reqEndpoint;
reqEndpoint << "tcp://*:" << m_requesterPort;
m_requester->bind(reqEndpoint.str().c_str());
m_repSocket->bind(reqEndpoint.str().c_str());
}
RequesterImpl::~RequesterImpl() {
......@@ -74,8 +78,8 @@ WaitingImpl * RequesterImpl::waiting() {
void RequesterImpl::sendBinary(const std::string& request) {
string strRequestType = m_application->m_impl->createRequestType(PROTO_REQUEST);
string strRequestData;
string requestTypePart = m_application->m_impl->createRequestType(PROTO_REQUEST);
string requestDataPart;
proto::Request requestCommand;
requestCommand.set_applicationname(m_application->getName());
......@@ -84,9 +88,9 @@ void RequesterImpl::sendBinary(const std::string& request) {
requestCommand.set_serverurl(m_application->getUrl());
requestCommand.set_serverport(m_application->getPort());
requestCommand.set_requesterport(m_requesterPort);
requestCommand.SerializeToString(&strRequestData);
requestCommand.SerializeToString(&requestDataPart);
unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -102,8 +106,8 @@ void RequesterImpl::send(const std::string& request) {
void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::string& request2) {
string strRequestType = m_application->m_impl->createRequestType(PROTO_REQUEST);
string strRequestData;
string requestTypePart = m_application->m_impl->createRequestType(PROTO_REQUEST);
string requestDataPart;
proto::Request requestCommand;
requestCommand.set_applicationname(m_application->getName());
......@@ -113,9 +117,9 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
requestCommand.set_serverurl(m_application->getUrl());
requestCommand.set_serverport(m_application->getPort());
requestCommand.set_requesterport(m_requesterPort);
requestCommand.SerializeToString(&strRequestData);
requestCommand.SerializeToString(&requestDataPart);
unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -124,7 +128,7 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::s
bool RequesterImpl::receiveBinary(std::string& response) {
unique_ptr<zmq::message_t> message(new zmq::message_t);
m_requester->recv(message.get(), 0);
m_repSocket->recv(message.get(), 0);
// multi-part message, first part is the type
proto::MessageType messageType;
......@@ -132,7 +136,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
if (message->more()) {
message.reset(new zmq::message_t);
m_requester->recv(message.get(), 0);
m_repSocket->recv(message.get(), 0);
} else {
cerr << "unexpected number of frames, should be 2" << endl;
......@@ -152,7 +156,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
memcpy((void *) reply->data(), data.c_str(), size);
m_requester->send(*reply);
m_repSocket->send(*reply);
return !m_canceled;
}
......@@ -172,10 +176,9 @@ void RequesterImpl::cancel() {
stringstream requesterEndpoint;
requesterEndpoint << m_application->getUrl() << ":" << m_requesterPort;
string strRequestType = m_application->m_impl->createRequestType(PROTO_CANCEL);
string strRequestData = "cancel";
unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, requesterEndpoint.str());
// Create a request socket only for the request.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(requesterEndpoint.str());
unique_ptr<zmq::message_t> reply = requestSocket->request(m_application->m_impl->createRequestType(PROTO_CANCEL), "cancel");
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -183,14 +186,16 @@ void RequesterImpl::cancel() {
void RequesterImpl::terminate() {
if (m_requester.get() != nullptr) {
m_requester.reset(nullptr);
if (m_repSocket.get() != nullptr) {
m_repSocket.reset(nullptr);
bool success = m_application->removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
if (!success) {
cerr << "server cannot destroy requester " << m_name << endl;
}
}
m_requestSocket.reset();
}
}
......@@ -30,10 +30,12 @@ namespace application {
class This;
}
class RequestSocketImpl;
class RequesterImpl {
public:
RequesterImpl(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
RequesterImpl(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
~RequesterImpl();
static int newRequesterId();
......@@ -51,13 +53,14 @@ public:
void cancel();
void terminate();
const application::This * m_application;
application::This * m_application;
int m_requesterPort;
std::string m_responderEndpoint;
std::string m_name;
int m_responderId;
int m_requesterId;
std::unique_ptr<zmq::socket_t> m_requester;
std::string m_responderEndpoint;
std::unique_ptr<RequestSocketImpl> m_requestSocket;
std::unique_ptr<zmq::socket_t> m_repSocket;
bool m_canceled;
static const std::string REQUESTER_PREFIX;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment