Commit d7aee1c2 authored by legoc's avatar legoc
Browse files

Use request socket in Responder

parent f0e93273
......@@ -1010,7 +1010,7 @@ std::unique_ptr<Server> Request::getServer() {
///////////////////////////////////////////////////////////////////////////
// Responder
Responder::Responder(const application::This * application, int responderPort, const std::string& name) :
Responder::Responder(application::This * application, int responderPort, const std::string& name) :
m_impl(new ResponderImpl(application, responderPort, name)) {
// Create the waiting here.
......
......@@ -435,7 +435,7 @@ public:
bool isCanceled() const;
private:
Responder(const application::This * application, int responderPort, const std::string& name);
Responder(application::This * application, int responderPort, const std::string& name);
std::unique_ptr<ResponderImpl> m_impl;
std::unique_ptr<WaitingImpl> m_waiting;
......
......@@ -19,13 +19,14 @@
#include "../Application.h"
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using namespace std;
namespace cameo {
RequestImpl::RequestImpl(const application::This * application, const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort) :
RequestImpl::RequestImpl(application::This * application, const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort) :
m_application(application),
m_message(message),
m_requesterApplicationName(requesterApplicationName),
......@@ -44,13 +45,15 @@ RequestImpl::~RequestImpl() {
}
void RequestImpl::replyBinary(const std::string& response) {
string strRequestType = m_application->m_impl->createRequestType(PROTO_RESPONSE);
m_application->m_impl->tryRequestWithOnePartReply(strRequestType, response, m_requesterEndpoint);
// Create a request socket. It is created for each request that could be optimized.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_requesterEndpoint);
requestSocket->request(m_application->m_impl->createRequestType(PROTO_RESPONSE), response);
}
void RequestImpl::reply(const std::string& response) {
// encode the data
// Encode the data.
string result;
serialize(response, result);
......
......@@ -30,13 +30,13 @@ namespace application {
class RequestImpl {
public:
RequestImpl(const application::This * application, const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort);
RequestImpl(application::This * application, const std::string & requesterApplicationName, int requesterApplicationId, const std::string& message, const std::string& serverUrl, int serverPort, int requesterPort);
~RequestImpl();
void replyBinary(const std::string& response);
void reply(const std::string& response);
const application::This * m_application;
application::This * m_application;
std::string m_requesterEndpoint;
std::string m_message;
std::string m_message2;
......
......@@ -19,6 +19,7 @@
#include "../Serializer.h"
#include "ServicesImpl.h"
#include "RequestImpl.h"
#include "RequestSocketImpl.h"
#include <sstream>
using namespace std;
......@@ -27,7 +28,7 @@ namespace cameo {
const std::string ResponderImpl::RESPONDER_PREFIX = "rep.";
ResponderImpl::ResponderImpl(const application::This * application, int responderPort, const std::string& name) :
ResponderImpl::ResponderImpl(application::This * application, int responderPort, const std::string& name) :
m_application(application),
m_responderPort(responderPort),
m_name(name),
......@@ -50,10 +51,9 @@ void ResponderImpl::cancel() {
stringstream endpoint;
endpoint << m_application->getUrl() << ":" << m_responderPort;
string strRequestType = m_application->m_impl->createRequestType(PROTO_CANCEL);
string strRequestData = "cancel";
unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
// Create a request socket.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
unique_ptr<zmq::message_t> reply = requestSocket->request(m_application->m_impl->createRequestType(PROTO_CANCEL), "cancel");
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......
......@@ -34,7 +34,7 @@ class RequestImpl;
class ResponderImpl {
public:
ResponderImpl(const application::This * application, int responderPort, const std::string& name);
ResponderImpl(application::This * application, int responderPort, const std::string& name);
~ResponderImpl();
void cancel();
......@@ -43,7 +43,7 @@ public:
std::unique_ptr<RequestImpl> receive();
void terminate();
const application::This * m_application;
application::This * m_application;
int m_responderPort;
std::string m_name;
std::unique_ptr<zmq::socket_t> m_responder;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment