Commit 41ae26d8 authored by legoc's avatar legoc
Browse files

Use request socket for Server

parent c2fa6de2
......@@ -24,6 +24,7 @@
#include "ProtoType.h"
#include "EventThread.h"
#include "impl/StreamSocketImpl.h"
#include "impl/RequestSocketImpl.h"
using namespace std;
......@@ -110,10 +111,7 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op
int Server::getStreamPort(const std::string& name) {
string strRequestType = m_impl->createRequestType(PROTO_OUTPUT);
string strRequestData = m_impl->createOutputRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_OUTPUT), m_impl->createOutputRequest(name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -133,16 +131,13 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
try {
if (outputStream) {
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
string strRequestType = m_impl->createRequestType(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_START), m_impl->createStartRequest(name, args, application::This::getReference()));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -162,18 +157,18 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
string strRequestType;
string strRequestData;
string requestTypePart;
string requestDataPart;
if (immediately) {
strRequestType = m_impl->createRequestType(PROTO_KILL);
strRequestData = m_impl->createKillRequest(id);
requestTypePart = m_impl->createRequestType(PROTO_KILL);
requestDataPart = m_impl->createKillRequest(id);
} else {
strRequestType = m_impl->createRequestType(PROTO_STOP);
strRequestData = m_impl->createStopRequest(id);
requestTypePart = m_impl->createRequestType(PROTO_STOP);
requestDataPart = m_impl->createStopRequest(id);
}
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -187,9 +182,7 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
application::InstanceArray instances;
string strRequestType = m_impl->createRequestType(PROTO_CONNECT);
string strRequestData = m_impl->createConnectRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECT), m_impl->createConnectRequest(name));
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
......@@ -269,9 +262,7 @@ void Server::killAllAndWaitFor(const std::string& name) {
bool Server::isAlive(int id) const {
string strRequestType = m_impl->createRequestType(PROTO_ISALIVE);
string strRequestData = m_impl->createIsAliveRequest(id);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ISALIVE), m_impl->createIsAliveRequest(id));
proto::IsAliveResponse isAliveResponse;
isAliveResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -283,9 +274,7 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
vector<application::Configuration> configVector;
string strRequestType = m_impl->createRequestType(PROTO_ALLAVAILABLE);
string strRequestData = m_impl->createAllAvailableRequest();
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ALLAVAILABLE), m_impl->createAllAvailableRequest());
proto::AllAvailableResponse allAvailableResponse;
allAvailableResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -309,11 +298,9 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
std::vector<application::Info> Server::getApplicationInfos() const {
vector<application::Info> infoVector;
vector<application::Info> infos;
string strRequestType = m_impl->createRequestType(PROTO_SHOWALL);
string strRequestData = m_impl->createShowAllRequest();
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_SHOWALL), m_impl->createShowAllRequest());
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
......@@ -328,10 +315,10 @@ std::vector<application::Info> Server::getApplicationInfos() const {
info.pastapplicationstates(),
info.args());
infoVector.push_back(applicationInfo);
infos.push_back(applicationInfo);
}
return infoVector;
return infos;
}
std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {
......@@ -355,10 +342,8 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const {
string strRequestType = m_impl->createRequestType(PROTO_CONNECTPUBLISHER);
string strRequestData = m_impl->createConnectPublisherRequest(id, publisherName);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECTPUBLISHER), m_impl->createConnectPublisherRequest(id, publisherName));
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment