/* * Copyright 2015 Institut Laue-Langevin * * Licensed under the EUPL, Version 1.1 only (the "License"); * You may not use this work except in compliance with the Licence. * You may obtain a copy of the Licence at: * * http://joinup.ec.europa.eu/software/page/eupl * * Unless required by applicable law or agreed to in writing, software * distributed under the Licence is distributed on an "AS IS" basis, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Licence for the specific language governing permissions and * limitations under the Licence. */ #include "Server.h" #include "Application.h" #include "ConnectionChecker.h" #include "impl/ServicesImpl.h" #include "EventThread.h" #include "impl/StreamSocketImpl.h" #include "impl/RequestSocketImpl.h" #include "message/Message.h" #include "UndefinedApplicationException.h" #include "UndefinedKeyException.h" #include #include #include "JSON.h" using namespace std; namespace cameo { constexpr int defaultTimeout = 10000; void Server::initServer(const Endpoint& endpoint, int timeoutMs) { Services::init(); m_serverEndpoint = endpoint; // Set the timeout. Services::setTimeout(timeoutMs); // Create the request socket. The server endpoint has been defined. Services::initRequestSocket(); // Manage the ConnectionTimeout exception that can occur. try { // Retrieve the server version. Services::retrieveServerVersion(); // Start the event thread. unique_ptr socket = openEventStream(); m_eventThread.reset(new EventThread(this, socket)); m_eventThread->start(); } catch (...) { // ... } } Server::Server(const Endpoint& endpoint, int timeoutMs) : Services() { Services::init(); initServer(endpoint, timeoutMs); } Server::Server(const std::string& endpoint, int timeoutMs) : Services() { Services::init(); try { initServer(Endpoint::parse(endpoint), timeoutMs); } catch (...) { throw InvalidArgumentException(endpoint + " is not a valid endpoint"); } } Server::~Server() { // Stop the event thread. if (m_eventThread.get() != nullptr) { m_eventThread->cancel(); } } void Server::setTimeout(int value) { Services::setTimeout(value); } int Server::getTimeout() const { return Services::getTimeout(); } const Endpoint& Server::getEndpoint() const { return Services::getEndpoint(); } std::array Server::getVersion() const { return Services::getVersion(); } bool Server::isAvailable(int timeout) const { return Services::isAvailable(timeout); } bool Server::isAvailable() const { return isAvailable(getAvailableTimeout()); } int Server::getAvailableTimeout() const { int timeout = getTimeout(); if (timeout > 0) { return timeout; } else { return defaultTimeout; } } std::unique_ptr Server::makeInstance() { return unique_ptr(new application::Instance(this)); } std::unique_ptr Server::start(const std::string& name, int options) { return start(name, vector(), options); } std::unique_ptr Server::start(const std::string& name, const std::vector & args, int options) { bool outputStream = ((options & OUTPUTSTREAM) != 0); unique_ptr instance = makeInstance(); // Set the name and register the instance as event listener. instance->setName(name); registerEventListener(instance.get()); try { unique_ptr streamSocket; if (outputStream) { // Connect to the stream port. A sync is made to ensure that the subscriber is connected. streamSocket = createOutputStreamSocket(name); } unique_ptr reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getName(), application::This::getId(), application::This::getEndpoint().toString())); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == -1) { instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString()); } else { instance->setId(value); if (outputStream) { instance->setOutputStreamSocket(streamSocket); } } } catch (const ConnectionTimeout& e) { instance->setErrorMessage(e.what()); } return instance; } Response Server::stopApplicationAsynchronously(int id, bool immediately) const { string request; if (immediately) { request = m_impl->createKillRequest(id); } else { request = m_impl->createStopRequest(id); } unique_ptr reply = m_requestSocket->request(request); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); string message = response[message::RequestResponse::MESSAGE].GetString(); return Response(value, message); } application::InstanceArray Server::connectAll(const std::string& name, int options) { bool outputStream = ((options & OUTPUTSTREAM) != 0); unique_ptr reply = m_requestSocket->request(m_impl->createConnectRequest(name)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); application::InstanceArray instances; json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO]; json::Value::Array array = applicationInfo.GetArray(); size_t size = array.Size(); // Allocate the array. instances.reserve(size); int aliveInstancesCount = 0; for (int i = 0; i < size; ++i) { json::Value::Object info = array[i].GetObject(); unique_ptr instance = makeInstance(); // Set the name and register the instance as event listener. string name = info[message::ApplicationInfo::NAME].GetString(); instance->setName(name); registerEventListener(instance.get()); int applicationId = info[message::ApplicationInfo::ID].GetInt(); // test if the application is still alive otherwise we could have missed a status message if (isAlive(applicationId)) { aliveInstancesCount++; instance->setId(applicationId); instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt()); instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt()); if (outputStream) { unique_ptr streamSocket = createOutputStreamSocket(name); instance->setOutputStreamSocket(streamSocket); } instances.push_back(std::move(instance)); } } // Copy the alive instances. application::InstanceArray aliveInstances; aliveInstances.reserve(aliveInstancesCount); int j = 0; for (int i = 0; i < size; ++i) { if (instances[i].get() != nullptr) { aliveInstances.push_back(std::move(instances[i])); j++; } } return aliveInstances; } std::unique_ptr Server::connect(const std::string& name, int options) { application::InstanceArray instances = connectAll(name, options); if (instances.size() == 0) { unique_ptr instance = makeInstance(); return instance; } return std::move(instances[0]); } std::unique_ptr Server::connect(int id, int options) { bool outputStream = ((options & OUTPUTSTREAM) != 0); unique_ptr reply = m_requestSocket->request(m_impl->createConnectWithIdRequest(id)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO]; json::Value::Array array = applicationInfo.GetArray(); size_t size = array.Size(); if (size > 0) { json::Value::Object info = array[0].GetObject(); unique_ptr instance = makeInstance(); // Set the name and register the instance as event listener. string name = info[message::ApplicationInfo::NAME].GetString(); instance->setName(name); registerEventListener(instance.get()); int applicationId = info[message::ApplicationInfo::ID].GetInt(); // test if the application is still alive otherwise we could have missed a status message if (isAlive(applicationId)) { instance->setId(applicationId); instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt()); instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt()); if (outputStream) { unique_ptr streamSocket = createOutputStreamSocket(name); instance->setOutputStreamSocket(streamSocket); } return instance; } } return makeInstance(); } void Server::killAllAndWaitFor(const std::string& name) { application::InstanceArray instances = connectAll(name); for (int i = 0; i < instances.size(); ++i) { instances[i]->kill(); instances[i]->waitFor(); } } bool Server::isAlive(int id) const { unique_ptr reply = m_requestSocket->request(m_impl->createIsAliveRequest(id)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); return response[message::IsAliveResponse::IS_ALIVE].GetBool(); } std::vector Server::getApplicationConfigurations() const { vector configs; unique_ptr reply = m_requestSocket->request(m_impl->createListRequest()); // Get the JSON response. json::Object response; json::parse(response, reply.get()); json::Value& applicationConfigs = response[message::ApplicationConfigListResponse::APPLICATION_CONFIG]; json::Value::Array array = applicationConfigs.GetArray(); size_t size = array.Size(); for (int i = 0; i < size; ++i) { json::Value::Object config = array[i].GetObject(); string name = config[message::ApplicationConfig::NAME].GetString(); string description = config[message::ApplicationConfig::DESCRIPTION].GetString(); bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool(); bool restart = config[message::ApplicationConfig::RESTART].GetBool(); int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt(); int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt(); application::Configuration applicationConfig(name, description, runsSingle, restart, startingTime, stoppingTime); configs.push_back(applicationConfig); } return configs; } std::vector Server::getApplicationInfos() const { vector infos; unique_ptr reply = m_requestSocket->request(m_impl->createAppsRequest()); // Get the JSON response. json::Object response; json::parse(response, reply.get()); json::Value& applicationInfos = response[message::ApplicationInfoListResponse::APPLICATION_INFO]; json::Value::Array array = applicationInfos.GetArray(); size_t size = array.Size(); for (int i = 0; i < size; ++i) { json::Value::Object info = array[i].GetObject(); string name = info[message::ApplicationInfo::NAME].GetString(); int id = info[message::ApplicationInfo::ID].GetInt(); int pid = info[message::ApplicationInfo::PID].GetInt(); application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt(); application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt(); string args = info[message::ApplicationInfo::ARGS].GetString(); application::Info applicationInfo(name, id, pid, state, pastStates, args); infos.push_back(applicationInfo); } return infos; } std::vector Server::getApplicationInfos(const std::string& name) const { vector allInfos = getApplicationInfos(); vector infos; for (vector::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) { application::Info const & info = *i; if (info.getName() == name) { infos.push_back(info); } } return infos; } std::vector Server::getPorts() const { vector ports; unique_ptr reply = m_requestSocket->request(m_impl->createPortsRequest()); // Get the JSON response. json::Object response; json::parse(response, reply.get()); json::Value& portInfos = response[message::PortInfoListResponse::PORT_INFO]; json::Value::Array array = portInfos.GetArray(); size_t size = array.Size(); for (int i = 0; i < size; ++i) { json::Value::Object info = array[i].GetObject(); int port = info[message::PortInfo::PORT].GetInt(); string status = info[message::PortInfo::STATUS].GetString(); string owner = info[message::PortInfo::OWNER].GetString(); application::Port portInfo(port, status, owner); ports.push_back(portInfo); } return ports; } application::State Server::getActualState(int id) const { unique_ptr reply = m_requestSocket->request(m_impl->createGetStatusRequest(id)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); return response[message::StatusEvent::APPLICATION_STATE].GetInt(); } std::set Server::getPastStates(int id) const { unique_ptr reply = m_requestSocket->request(m_impl->createGetStatusRequest(id)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); application::State applicationStates = response[message::StatusEvent::PAST_APPLICATION_STATES].GetInt(); set result; if ((applicationStates & application::STARTING) != 0) { result.insert(application::STARTING); } if ((applicationStates & application::RUNNING) != 0) { result.insert(application::RUNNING); } if ((applicationStates & application::STOPPING) != 0) { result.insert(application::STOPPING); } if ((applicationStates & application::KILLING) != 0) { result.insert(application::KILLING); } if ((applicationStates & application::PROCESSING_ERROR) != 0) { result.insert(application::PROCESSING_ERROR); } if ((applicationStates & application::FAILURE) != 0) { result.insert(application::FAILURE); } if ((applicationStates & application::SUCCESS) != 0) { result.insert(application::SUCCESS); } if ((applicationStates & application::STOPPED) != 0) { result.insert(application::STOPPED); } if ((applicationStates & application::KILLED) != 0) { result.insert(application::KILLED); } return result; } std::unique_ptr Server::openEventStream() { return Services::openEventStream(); } std::unique_ptr Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) { unique_ptr reply = m_requestSocket->request(m_impl->createConnectPublisherRequest(id, publisherName)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt(); if (publisherPort == -1) { throw SubscriberCreationException(response[message::PublisherResponse::MESSAGE].GetString()); } int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt(); int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt(); // TODO simplify the use of some variables: e.g. m_serverEndpoint accessible from this. unique_ptr subscriber(new application::Subscriber(this, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint.toString(), m_serverEndpoint.withPort(m_statusPort).toString())); subscriber->init(); return subscriber; } std::unique_ptr Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) { unique_ptr connectionChecker(new ConnectionChecker(this, handler)); connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs); return connectionChecker; } void Server::storeKeyValue(int id, const std::string& key, const std::string& value) { unique_ptr reply = m_requestSocket->request(m_impl->createStoreKeyValueRequest(id, key, value)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); } std::string Server::getKeyValue(int id, const std::string& key) { unique_ptr reply = m_requestSocket->request(m_impl->createGetKeyValueRequest(id, key)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == 0) { return response[message::RequestResponse::MESSAGE].GetString(); } else if (value == -1) { throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString()); } else if (value == -2) { throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString()); } return ""; } void Server::removeKey(int id, const std::string& key) { unique_ptr reply = m_requestSocket->request(m_impl->createRemoveKeyRequest(id, key)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == -1) { throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString()); } else if (value == -2) { throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString()); } } int Server::requestPort(int id) { unique_ptr reply = m_requestSocket->request(m_impl->createRequestPortRequest(id)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == -1) { throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString()); } return value; } void Server::setPortUnavailable(int id, int port) { unique_ptr reply = m_requestSocket->request(m_impl->createPortUnavailableRequest(id, port)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == -1) { throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString()); } } void Server::releasePort(int id, int port) { unique_ptr reply = m_requestSocket->request(m_impl->createReleasePortRequest(id, port)); // Get the JSON response. json::Object response; json::parse(response, reply.get()); int value = response[message::RequestResponse::VALUE].GetInt(); if (value == -1) { throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString()); } } std::vector Server::getEventListeners() { std::unique_lock lock(m_eventListenersMutex); return m_eventListeners; } void Server::registerEventListener(EventListener * listener) { std::unique_lock lock(m_eventListenersMutex); m_eventListeners.push_back(listener); } void Server::unregisterEventListener(EventListener * listener) { std::unique_lock lock(m_eventListenersMutex); // Iterate to find the listener. for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) { if (*it == listener) { m_eventListeners.erase(it); break; } } } std::ostream& operator<<(std::ostream& os, const cameo::Server& server) { os << "server@" << server.m_serverEndpoint.toString(); return os; } }