Commit c1e104a6 authored by legoc's avatar legoc
Browse files

(split) Added port requests to the Com classes

parent 895e4d18
......@@ -113,6 +113,10 @@ public:
std::string getKeyValue(const std::string& key) const;
void removeKey(const std::string& key) const;
int requestPort() const;
void setPortUnavailable(int port) const;
void releasePort(int port) const;
private:
Com(Server * server, int applicationId);
......
......@@ -133,10 +133,15 @@ private:
Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName);
int getAvailableTimeout() const;
void storeKeyValue(int id, const std::string& key, const std::string& value);
std::string getKeyValue(int id, const std::string& key);
void removeKey(int id, const std::string& key);
int requestPort(int id);
void setPortUnavailable(int id, int port);
void releasePort(int id, int port);
std::mutex m_eventListenersMutex;
std::vector<EventListener *> m_eventListeners;
std::unique_ptr<EventThread> m_eventThread;
......
......@@ -69,6 +69,18 @@ void This::Com::removeKey(const std::string& key) const {
m_server->removeKey(m_applicationId, key);
}
int This::Com::requestPort() const {
return m_server->requestPort(m_applicationId);
}
void This::Com::setPortUnavailable(int port) const {
m_server->setPortUnavailable(m_applicationId, port);
}
void This::Com::releasePort(int port) const {
m_server->releasePort(m_applicationId, port);
}
State This::parseState(const std::string& value) {
if (value == "UNKNOWN") {
......
......@@ -570,6 +570,50 @@ void Server::removeKey(int id, const std::string& key) {
}
}
int Server::requestPort(int id) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestPortRequest(id));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
if (value == -1) {
throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
}
return value;
}
void Server::setPortUnavailable(int id, int port) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createPortUnavailableRequest(id, port));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
if (value == -1) {
throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
}
}
void Server::releasePort(int id, int port) {
unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createReleasePortRequest(id, port));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
if (value == -1) {
throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
}
}
std::vector<EventListener *> Server::getEventListeners() {
std::unique_lock<std::mutex> lock(m_eventListenersMutex);
return m_eventListeners;
......
......@@ -466,6 +466,48 @@ std::string ServicesImpl::createRemoveKeyRequest(int id, const std::string& key)
return request.toString();
}
std::string ServicesImpl::createRequestPortRequest(int id) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::REQUEST_PORT);
request.pushKey(message::RequestPortRequest::ID);
request.pushInt(id);
return request.toString();
}
std::string ServicesImpl::createPortUnavailableRequest(int id, int port) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::PORT_UNAVAILABLE);
request.pushKey(message::PortUnavailableRequest::ID);
request.pushInt(id);
request.pushKey(message::PortUnavailableRequest::PORT);
request.pushInt(port);
return request.toString();
}
std::string ServicesImpl::createReleasePortRequest(int id, int port) {
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::RELEASE_PORT);
request.pushKey(message::ReleasePortRequest::ID);
request.pushInt(id);
request.pushKey(message::ReleasePortRequest::PORT);
request.pushInt(port);
return request.toString();
}
zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
......
......@@ -65,6 +65,9 @@ public:
std::string createStoreKeyValueRequest(int id, const std::string& key, const std::string& value);
std::string createGetKeyValueRequest(int id, const std::string& key);
std::string createRemoveKeyRequest(int id, const std::string& key);
std::string createRequestPortRequest(int id);
std::string createPortUnavailableRequest(int id, int port);
std::string createReleasePortRequest(int id, int port);
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
......
......@@ -59,7 +59,7 @@ namespace message {
const int GET_KEY_VALUE = 32;
const int REMOVE_KEY = 33;
const int REQUEST_PORT = 34;
const int UNAVAILABLE_PORT = 35;
const int PORT_UNAVAILABLE = 35;
const int RELEASE_PORT = 36;
const int PORTS = 37;
......@@ -289,7 +289,7 @@ namespace message {
constexpr const char* ID = "id"; // int32
}
namespace UnavailablePortRequest {
namespace PortUnavailableRequest {
constexpr const char* ID = "id"; // int32
constexpr const char* PORT = "port"; // int32
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment