Commit 51f37ae9 authored by legoc's avatar legoc
Browse files

(split) Added connect with id request in cameo server and added the...

(split) Added connect with id request in cameo server and added the corresponding connect() in Server class
parent 48f37a3c
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
* Added Instance::getPastStates(). * Added Instance::getPastStates().
* Added Instance.getExitCode(). * Added Instance.getExitCode().
* Filter OutputStreamSocket on application id. * Filter OutputStreamSocket on application id.
* Added Server::connect() with id.
0.3.3 0.3.3
----- -----
......
...@@ -69,6 +69,7 @@ public: ...@@ -69,6 +69,7 @@ public:
std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE); std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE);
application::InstanceArray connectAll(const std::string& name, Option options = NONE); application::InstanceArray connectAll(const std::string& name, Option options = NONE);
std::unique_ptr<application::Instance> connect(const std::string& name, Option options = NONE); std::unique_ptr<application::Instance> connect(const std::string& name, Option options = NONE);
std::unique_ptr<application::Instance> connect(int id, Option options = NONE);
/** /**
* throws ConnectionTimeout * throws ConnectionTimeout
...@@ -128,6 +129,7 @@ public: ...@@ -128,6 +129,7 @@ public:
private: private:
std::unique_ptr<application::Instance> makeInstance(); std::unique_ptr<application::Instance> makeInstance();
bool isAlive(int id) const; bool isAlive(int id) const;
Response stopApplicationAsynchronously(int id, bool immediately) const; Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName); std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName);
int getAvailableTimeout() const; int getAvailableTimeout() const;
......
...@@ -209,14 +209,14 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op ...@@ -209,14 +209,14 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
application::InstanceArray instances;
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name)); unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name));
// Get the JSON response. // Get the JSON response.
json::Object response; json::Object response;
json::parse(response, reply.get()); json::parse(response, reply.get());
application::InstanceArray instances;
json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO]; json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
json::Value::Array array = applicationInfo.GetArray(); json::Value::Array array = applicationInfo.GetArray();
size_t size = array.Size(); size_t size = array.Size();
...@@ -285,6 +285,53 @@ std::unique_ptr<application::Instance> Server::connect(const std::string& name, ...@@ -285,6 +285,53 @@ std::unique_ptr<application::Instance> Server::connect(const std::string& name,
return std::move(instances[0]); return std::move(instances[0]);
} }
std::unique_ptr<application::Instance> Server::connect(int id, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectWithIdRequest(id));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
json::Value::Array array = applicationInfo.GetArray();
size_t size = array.Size();
if (size > 0) {
json::Value::Object info = array[0].GetObject();
unique_ptr<application::Instance> instance = makeInstance();
// Set the name and register the instance as event listener.
string name = info[message::ApplicationInfo::NAME].GetString();
instance->setName(name);
registerEventListener(instance.get());
int applicationId = info[message::ApplicationInfo::ID].GetInt();
// test if the application is still alive otherwise we could have missed a status message
if (isAlive(applicationId)) {
// We connect to the stream port before starting the application.
// However that does NOT guarantee that the stream will be connected before the ENDSTREAM arrives in case of an application that terminates rapidly.
instance->setId(applicationId);
instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
if (outputStream) {
unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(streamSocket);
}
return instance;
}
}
return makeInstance();
}
void Server::killAllAndWaitFor(const std::string& name) { void Server::killAllAndWaitFor(const std::string& name) {
application::InstanceArray instances = connectAll(name); application::InstanceArray instances = connectAll(name);
......
...@@ -141,6 +141,18 @@ std::string ServicesImpl::createConnectRequest(const std::string& name) const { ...@@ -141,6 +141,18 @@ std::string ServicesImpl::createConnectRequest(const std::string& name) const {
return request.toString(); return request.toString();
} }
std::string ServicesImpl::createConnectWithIdRequest(int id) const {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CONNECT_WITH_ID);
request.pushKey(message::ConnectWithIdRequest::ID);
request.pushInt(id);
return request.toString();
}
std::string ServicesImpl::createListRequest() const { std::string ServicesImpl::createListRequest() const {
json::StringObject request; json::StringObject request;
......
...@@ -40,6 +40,7 @@ public: ...@@ -40,6 +40,7 @@ public:
std::string createStopRequest(int id) const; std::string createStopRequest(int id) const;
std::string createKillRequest(int id) const; std::string createKillRequest(int id) const;
std::string createConnectRequest(const std::string& name) const; std::string createConnectRequest(const std::string& name) const;
std::string createConnectWithIdRequest(int id) const;
std::string createIsAliveRequest(int id) const; std::string createIsAliveRequest(int id) const;
std::string createListRequest() const; std::string createListRequest() const;
std::string createAppsRequest() const; std::string createAppsRequest() const;
......
...@@ -29,10 +29,10 @@ namespace message { ...@@ -29,10 +29,10 @@ namespace message {
const int START = 2; const int START = 2;
const int STOP = 3; const int STOP = 3;
const int CONNECT = 4; const int CONNECT = 4;
const int APPS = 5; const int CONNECT_WITH_ID = 5;
const int OUTPUT_PORT_WITH_ID = 6; const int APPS = 6;
const int OUTPUT_PORT = 7; const int OUTPUT_PORT = 7;
const int ENABLE_STREAM = 8; const int OUTPUT_PORT_WITH_ID = 8;
const int IS_ALIVE = 9; const int IS_ALIVE = 9;
const int WRITE_INPUT = 10; const int WRITE_INPUT = 10;
const int KILL = 11; const int KILL = 11;
...@@ -88,6 +88,10 @@ namespace message { ...@@ -88,6 +88,10 @@ namespace message {
constexpr const char* NAME = "name"; // required string name = 1; constexpr const char* NAME = "name"; // required string name = 1;
} }
namespace ConnectWithIdRequest {
constexpr const char* ID = "id"; // int32
}
namespace ApplicationConfig { namespace ApplicationConfig {
constexpr const char* NAME = "name"; // required string name = 1; constexpr const char* NAME = "name"; // required string name = 1;
constexpr const char* DESCRIPTION = "description"; // optional string description = 2; constexpr const char* DESCRIPTION = "description"; // optional string description = 2;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment