Skip to content
Snippets Groups Projects
Commit 88324b5f authored by legoc's avatar legoc
Browse files

Use request socket for Server

parent 1b041459
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,7 @@
#include "ProtoType.h"
#include "EventThread.h"
#include "impl/StreamSocketImpl.h"
#include "impl/RequestSocketImpl.h"
using namespace std;
......@@ -110,10 +111,7 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op
int Server::getStreamPort(const std::string& name) {
string strRequestType = m_impl->createRequestType(PROTO_OUTPUT);
string strRequestData = m_impl->createOutputRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_OUTPUT), m_impl->createOutputRequest(name));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -133,16 +131,13 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
try {
if (outputStream) {
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
// We connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
string strRequestType = m_impl->createRequestType(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_START), m_impl->createStartRequest(name, args, application::This::getReference()));
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -162,18 +157,18 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
string strRequestType;
string strRequestData;
string requestTypePart;
string requestDataPart;
if (immediately) {
strRequestType = m_impl->createRequestType(PROTO_KILL);
strRequestData = m_impl->createKillRequest(id);
requestTypePart = m_impl->createRequestType(PROTO_KILL);
requestDataPart = m_impl->createKillRequest(id);
} else {
strRequestType = m_impl->createRequestType(PROTO_STOP);
strRequestData = m_impl->createStopRequest(id);
requestTypePart = m_impl->createRequestType(PROTO_STOP);
requestDataPart = m_impl->createStopRequest(id);
}
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -187,9 +182,7 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
application::InstanceArray instances;
string strRequestType = m_impl->createRequestType(PROTO_CONNECT);
string strRequestData = m_impl->createConnectRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECT), m_impl->createConnectRequest(name));
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
......@@ -269,9 +262,7 @@ void Server::killAllAndWaitFor(const std::string& name) {
bool Server::isAlive(int id) const {
string strRequestType = m_impl->createRequestType(PROTO_ISALIVE);
string strRequestData = m_impl->createIsAliveRequest(id);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ISALIVE), m_impl->createIsAliveRequest(id));
proto::IsAliveResponse isAliveResponse;
isAliveResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -283,9 +274,7 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
vector<application::Configuration> configVector;
string strRequestType = m_impl->createRequestType(PROTO_ALLAVAILABLE);
string strRequestData = m_impl->createAllAvailableRequest();
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ALLAVAILABLE), m_impl->createAllAvailableRequest());
proto::AllAvailableResponse allAvailableResponse;
allAvailableResponse.ParseFromArray((*reply).data(), (*reply).size());
......@@ -309,11 +298,9 @@ std::vector<application::Configuration> Server::getApplicationConfigurations() c
std::vector<application::Info> Server::getApplicationInfos() const {
vector<application::Info> infoVector;
vector<application::Info> infos;
string strRequestType = m_impl->createRequestType(PROTO_SHOWALL);
string strRequestData = m_impl->createShowAllRequest();
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_SHOWALL), m_impl->createShowAllRequest());
proto::ApplicationInfoListResponse response;
response.ParseFromArray((*reply).data(), (*reply).size());
......@@ -328,10 +315,10 @@ std::vector<application::Info> Server::getApplicationInfos() const {
info.pastapplicationstates(),
info.args());
infoVector.push_back(applicationInfo);
infos.push_back(applicationInfo);
}
return infoVector;
return infos;
}
std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {
......@@ -355,10 +342,8 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const {
string strRequestType = m_impl->createRequestType(PROTO_CONNECTPUBLISHER);
string strRequestData = m_impl->createConnectPublisherRequest(id, publisherName);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECTPUBLISHER), m_impl->createConnectPublisherRequest(id, publisherName));
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::PublisherResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment