Commit ff80a52d authored by legoc's avatar legoc
Browse files

Creation of the request socket and first use in Services::initStatus()

parent 308898f5
......@@ -180,6 +180,9 @@ void This::initApplication(int argc, char *argv[]) {
}
}
// Create the request socket. The server endpoint has been defined.
Services::initRequestSocket();
// Must be here because the server endpoint is required.
initStatus();
......
......@@ -46,6 +46,9 @@ Server::Server(const std::string& endpoint) :
is >> m_port;
m_serverEndpoint = m_url + ":" + port;
// Create the request socket. The server endpoint has been defined.
Services::initRequestSocket();
// Start the event thread.
unique_ptr<EventStreamSocket> socket = openEventStream();
m_eventThread.reset(new EventThread(this, socket));
......
......@@ -41,6 +41,11 @@ Services::~Services() {
}
void Services::terminate() {
// Reset the request socket before the impl, otherwise reset impl will block.
m_requestSocket.reset();
// Reset the impl.
m_impl.reset();
}
......@@ -49,6 +54,11 @@ void Services::init() {
m_impl.reset(new ServicesImpl());
}
void Services::initRequestSocket() {
// Create the request socket. The server endpoint must have been initialized.
m_requestSocket = std::move(createRequestSocket(m_serverEndpoint));
}
std::vector<std::string> Services::split(const std::string& info) {
vector<string> result;
......@@ -97,10 +107,11 @@ bool Services::isAvailable(int timeout) const {
void Services::initStatus() {
// get the status port
string strRequestType = m_impl->createRequestType(PROTO_STATUS);
string strRequestData = m_impl->createShowStatusRequest();
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
// Get the status port.
string requestTypePart = m_impl->createRequestType(PROTO_STATUS);
string requestDataPart = m_impl->createShowStatusRequest();
unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
......
......@@ -36,6 +36,7 @@ public:
void terminate();
void init();
void initRequestSocket();
std::vector<std::string> split(const std::string& info);
void setTimeout(int timeout);
......
......@@ -59,6 +59,71 @@ int ServicesImpl::getTimeout() const {
return m_timeout;
}
/**
* convert enum type into Proto type
*/
proto::MessageType_Type ServicesImpl::convertToProtoType(ProtoType type) const {
if (type == PROTO_INIT) {
return proto::MessageType_Type_INIT;
} else if (type == PROTO_ISALIVE) {
return proto::MessageType_Type_ISALIVE;
} else if (type == PROTO_SENDPARAMETERS) {
return proto::MessageType_Type_SENDPARAMETERS;
} else if (type == PROTO_SHOW) {
return proto::MessageType_Type_SHOW;
} else if (type == PROTO_STATUS) {
return proto::MessageType_Type_STATUS;
} else if (type == PROTO_SHOWALL) {
return proto::MessageType_Type_SHOWALL;
} else if (type == PROTO_START) {
return proto::MessageType_Type_START;
} else if (type == PROTO_STOP) {
return proto::MessageType_Type_STOP;
} else if (type == PROTO_KILL) {
return proto::MessageType_Type_KILL;
} else if (type == PROTO_CONNECT) {
return proto::MessageType_Type_CONNECT;
} else if (type == PROTO_ALLAVAILABLE) {
return proto::MessageType_Type_ALLAVAILABLE;
} else if (type == PROTO_SETSTATUS) {
return proto::MessageType_Type_SETSTATUS;
} else if (type == PROTO_GETSTATUS) {
return proto::MessageType_Type_GETSTATUS;
} else if (type == PROTO_SETRESULT) {
return proto::MessageType_Type_SETRESULT;
} else if (type == PROTO_CREATEPUBLISHER) {
return proto::MessageType_Type_CREATEPUBLISHER;
} else if (type == PROTO_CONNECTPUBLISHER) {
return proto::MessageType_Type_CONNECTPUBLISHER;
} else if (type == PROTO_SUBSCRIBEPUBLISHER) {
return proto::MessageType_Type_SUBSCRIBEPUBLISHER;
} else if (type == PROTO_TERMINATEPUBLISHER) {
return proto::MessageType_Type_TERMINATEPUBLISHER;
} else if (type == PROTO_REQUESTPORT) {
return proto::MessageType_Type_REQUESTPORT;
} else if (type == PROTO_CONNECTPORT) {
return proto::MessageType_Type_CONNECTPORT;
} else if (type == PROTO_REMOVEPORT) {
return proto::MessageType_Type_REMOVEPORT;
} else if (type == PROTO_REQUEST) {
return proto::MessageType_Type_REQUEST;
} else if (type == PROTO_RESPONSE) {
return proto::MessageType_Type_RESPONSE;
} else if (type == PROTO_CANCEL) {
return proto::MessageType_Type_CANCEL;
} else if (type == PROTO_STARTEDUNMANAGED) {
return proto::MessageType_Type_STARTEDUNMANAGED;
} else if (type == PROTO_TERMINATEDUNMANAGED) {
return proto::MessageType_Type_TERMINATEDUNMANAGED;
} else if (type == PROTO_OUTPUT) {
return proto::MessageType_Type_OUTPUT;
} else {
cerr << "unsupported proto type" << endl;
return proto::MessageType_Type(0);
}
}
std::string ServicesImpl::createIsAliveRequest(int id) const {
proto::IsAliveCommand isAliveCommand;
isAliveCommand.set_id(id);
......@@ -440,69 +505,4 @@ void ServicesImpl::subscribeToPublisher(const std::string& endpoint) {
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}
/**
* convert enum type into Proto type
*/
proto::MessageType_Type ServicesImpl::convertToProtoType(ProtoType type) const {
if (type == PROTO_INIT) {
return proto::MessageType_Type_INIT;
} else if (type == PROTO_ISALIVE) {
return proto::MessageType_Type_ISALIVE;
} else if (type == PROTO_SENDPARAMETERS) {
return proto::MessageType_Type_SENDPARAMETERS;
} else if (type == PROTO_SHOW) {
return proto::MessageType_Type_SHOW;
} else if (type == PROTO_STATUS) {
return proto::MessageType_Type_STATUS;
} else if (type == PROTO_SHOWALL) {
return proto::MessageType_Type_SHOWALL;
} else if (type == PROTO_START) {
return proto::MessageType_Type_START;
} else if (type == PROTO_STOP) {
return proto::MessageType_Type_STOP;
} else if (type == PROTO_KILL) {
return proto::MessageType_Type_KILL;
} else if (type == PROTO_CONNECT) {
return proto::MessageType_Type_CONNECT;
} else if (type == PROTO_ALLAVAILABLE) {
return proto::MessageType_Type_ALLAVAILABLE;
} else if (type == PROTO_SETSTATUS) {
return proto::MessageType_Type_SETSTATUS;
} else if (type == PROTO_GETSTATUS) {
return proto::MessageType_Type_GETSTATUS;
} else if (type == PROTO_SETRESULT) {
return proto::MessageType_Type_SETRESULT;
} else if (type == PROTO_CREATEPUBLISHER) {
return proto::MessageType_Type_CREATEPUBLISHER;
} else if (type == PROTO_CONNECTPUBLISHER) {
return proto::MessageType_Type_CONNECTPUBLISHER;
} else if (type == PROTO_SUBSCRIBEPUBLISHER) {
return proto::MessageType_Type_SUBSCRIBEPUBLISHER;
} else if (type == PROTO_TERMINATEPUBLISHER) {
return proto::MessageType_Type_TERMINATEPUBLISHER;
} else if (type == PROTO_REQUESTPORT) {
return proto::MessageType_Type_REQUESTPORT;
} else if (type == PROTO_CONNECTPORT) {
return proto::MessageType_Type_CONNECTPORT;
} else if (type == PROTO_REMOVEPORT) {
return proto::MessageType_Type_REMOVEPORT;
} else if (type == PROTO_REQUEST) {
return proto::MessageType_Type_REQUEST;
} else if (type == PROTO_RESPONSE) {
return proto::MessageType_Type_RESPONSE;
} else if (type == PROTO_CANCEL) {
return proto::MessageType_Type_CANCEL;
} else if (type == PROTO_STARTEDUNMANAGED) {
return proto::MessageType_Type_STARTEDUNMANAGED;
} else if (type == PROTO_TERMINATEDUNMANAGED) {
return proto::MessageType_Type_TERMINATEDUNMANAGED;
} else if (type == PROTO_OUTPUT) {
return proto::MessageType_Type_OUTPUT;
} else {
cerr << "unsupported proto type" << endl;
return proto::MessageType_Type(0);
}
}
}
......@@ -34,9 +34,7 @@ public:
void setTimeout(int timeout);
int getTimeout() const;
bool isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout);
void waitForSubscriber(zmq::socket_t * subscriber, const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint);
void subscribeToPublisher(const std::string& endpoint);
proto::MessageType_Type convertToProtoType(ProtoType type) const;
std::string createRequestType(ProtoType type) const;
std::string createInitRequest() const;
......@@ -70,11 +68,12 @@ public:
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
bool isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout);
void waitForSubscriber(zmq::socket_t * subscriber, const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint);
void subscribeToPublisher(const std::string& endpoint);
proto::MessageType_Type convertToProtoType(ProtoType type) const;
int m_timeout;
zmq::context_t m_context;
int m_timeout;
static const std::string STATUS;
static const std::string RESULT;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment