Commit e9998aea authored by legoc's avatar legoc
Browse files

Server absorbed Services that is no longer needed

parent 6d27f71d
......@@ -17,11 +17,6 @@
#ifndef CAMEO_APPLICATION_H_
#define CAMEO_APPLICATION_H_
#include <functional>
#include <vector>
#include <set>
#include <memory>
#include <optional>
#include "InvalidArgumentException.h"
#include "UnmanagedApplicationException.h"
#include "SocketException.h"
......@@ -30,12 +25,17 @@
#include "UndefinedKeyException.h"
#include "Response.h"
#include "Serializer.h"
#include "Services.h"
#include "Context.h"
#include "TimeCondition.h"
#include "EventListener.h"
#include "JSON.h"
#include "KeyValue.h"
#include "Strings.h"
#include <functional>
#include <vector>
#include <set>
#include <memory>
#include <optional>
namespace cameo {
......@@ -52,7 +52,7 @@ class SocketWaitingImpl;
class GenericWaitingImpl;
class WaitingImplSet;
class HandlerImpl;
class RequestSocketImpl;
namespace coms {
......
......@@ -33,7 +33,7 @@ class Instance;
class EventStreamSocket {
friend class Services;
friend class Server;
friend class application::Instance;
public:
......
......@@ -52,7 +52,7 @@ private:
class OutputStreamSocket {
friend class Services;
friend class Server;
friend class application::Instance;
void setApplicationId(int id);
......
......@@ -17,14 +17,17 @@
#ifndef CAMEO_SERVER_H_
#define CAMEO_SERVER_H_
#include <vector>
#include <memory>
#include <mutex>
#include "Application.h"
#include "ConnectionChecker.h"
#include "ConnectionTimeout.h"
#include "Response.h"
#include "Services.h"
#include "Strings.h"
#include "EventStreamSocket.h"
#include "OutputStreamSocket.h"
#include <vector>
#include <memory>
#include <mutex>
#include <array>
namespace cameo {
......@@ -34,8 +37,10 @@ namespace application {
class EventListener;
class EventThread;
class ContextImpl;
class RequestSocketImpl;
class Server : private Services {
class Server {
friend class application::Instance;
friend class application::This;
......@@ -150,6 +155,25 @@ private:
json::Object request(const std::string& request, int overrideTimeout = -1);
json::Object request(const std::string& requestPart1, const std::string& requestPart2, int overrideTimeout = -1);
void initContext();
void initRequestSocket();
Endpoint getStatusEndpoint() const;
void retrieveServerVersion();
void initStatus();
int getStreamPort(const std::string& name);
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout);
Endpoint m_serverEndpoint;
std::array<int, 3> m_serverVersion;
int m_statusPort;
std::unique_ptr<ContextImpl> m_contextImpl;
std::unique_ptr<RequestSocketImpl> m_requestSocket;
std::mutex m_eventListenersMutex;
std::vector<EventListener *> m_eventListeners;
std::unique_ptr<EventThread> m_eventThread;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_SERVICES_H_
#define CAMEO_SERVICES_H_
#include <string>
#include <vector>
#include <array>
#include "Strings.h"
#include "EventStreamSocket.h"
#include "OutputStreamSocket.h"
namespace cameo {
class ContextImpl;
class RequestSocketImpl;
class Services {
public:
Services();
~Services();
void terminate();
void init();
void initRequestSocket();
std::vector<std::string> split(const std::string& info);
void setTimeout(int timeout);
int getTimeout() const;
const Endpoint& getEndpoint() const;
std::array<int, 3> getVersion() const;
Endpoint getStatusEndpoint() const;
bool isAvailable(int timeout) const;
void retrieveServerVersion();
void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream();
int getStreamPort(const std::string& name);
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocketImpl> createRequestSocket(const std::string& endpoint, int timeout);
Endpoint m_serverEndpoint;
std::array<int, 3> m_serverVersion;
int m_statusPort;
std::unique_ptr<ContextImpl> m_impl;
std::unique_ptr<RequestSocketImpl> m_requestSocket;
};
}
#endif
......@@ -62,7 +62,7 @@ This::Com::Com(Server * server, int applicationId) :
}
Context* This::Com::getContext() const {
return m_server->m_impl.get();
return m_server->m_contextImpl.get();
}
void This::Com::storeKeyValue(const std::string& key, const std::string& value) const {
......
......@@ -21,6 +21,7 @@
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
#include "EventThread.h"
#include "impl/CancelIdGenerator.h"
#include "impl/RequestSocketImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/ContextImpl.h"
......@@ -28,26 +29,27 @@
#include "Messages.h"
#include <iostream>
#include <sstream>
#include <stdexcept>
namespace cameo {
constexpr int defaultTimeout = 10000;
void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
Services::init();
initContext();
m_serverEndpoint = endpoint;
// Set the timeout.
Services::setTimeout(timeoutMs);
setTimeout(timeoutMs);
// Create the request socket. The server endpoint has been defined.
Services::initRequestSocket();
initRequestSocket();
// Manage the ConnectionTimeout exception that can occur.
try {
// Retrieve the server version.
Services::retrieveServerVersion();
retrieveServerVersion();
// Start the event thread.
std::unique_ptr<EventStreamSocket> socket = openEventStream();
......@@ -60,17 +62,28 @@ void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
}
Server::Server(const Endpoint& endpoint, int timeoutMs) :
Services() {
m_statusPort(0),
m_contextImpl(nullptr) {
Services::init();
m_serverVersion[0] = 0;
m_serverVersion[1] = 0;
m_serverVersion[2] = 0;
initContext();
initServer(endpoint, timeoutMs);
}
Server::Server(const std::string& endpoint, int timeoutMs) :
Services() {
m_statusPort(0),
m_contextImpl(nullptr) {
m_serverVersion[0] = 0;
m_serverVersion[1] = 0;
m_serverVersion[2] = 0;
Services::init();
initContext();
try {
initServer(Endpoint::parse(endpoint), timeoutMs);
......@@ -81,30 +94,43 @@ Server::Server(const std::string& endpoint, int timeoutMs) :
}
Server::~Server() {
// Stop the event thread.
if (m_eventThread.get() != nullptr) {
m_eventThread->cancel();
}
m_eventThread.reset();
// Reset the request socket before the impl, otherwise reset context will block.
m_requestSocket.reset();
// Reset the context.
m_contextImpl.reset();
}
void Server::setTimeout(int value) {
Services::setTimeout(value);
void Server::setTimeout(int timeout) {
m_contextImpl->setTimeout(timeout);
if (m_requestSocket.get() != nullptr) {
m_requestSocket->setTimeout(timeout);
}
}
int Server::getTimeout() const {
return Services::getTimeout();
return m_contextImpl->getTimeout();
}
const Endpoint& Server::getEndpoint() const {
return Services::getEndpoint();
return m_serverEndpoint;
}
std::array<int, 3> Server::getVersion() const {
return Services::getVersion();
return m_serverVersion;
}
bool Server::isAvailable(int timeout) const {
return Services::isAvailable(timeout);
return m_contextImpl->isAvailable(m_requestSocket.get(), timeout);
}
bool Server::isAvailable() const {
......@@ -520,7 +546,26 @@ std::set<application::State> Server::getPastStates(int id) const {
}
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
return Services::openEventStream();
// Init the status port if necessary.
if (m_statusPort == 0) {
initStatus();
}
std::stringstream cancelEndpoint;
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// Create the sockets.
zmq::socket_t * cancelPublisher = m_contextImpl->createCancelPublisher(cancelEndpoint.str());
zmq::socket_t * subscriber = m_contextImpl->createEventSubscriber(getStatusEndpoint().toString(), cancelEndpoint.str());
// Wait for the connection to be ready.
m_contextImpl->waitForSubscriber(subscriber, m_requestSocket.get());
// Create the event stream socket.
return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
......@@ -667,6 +712,95 @@ void Server::unregisterEventListener(EventListener * listener) {
}
}
void Server::initContext() {
// Set the impl.
m_contextImpl.reset(new ContextImpl());
}
void Server::initRequestSocket() {
// Create the request socket. The server endpoint must have been initialized.
m_requestSocket = std::move(createRequestSocket(m_serverEndpoint.toString(), m_contextImpl->getTimeout()));
}
Endpoint Server::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
void Server::retrieveServerVersion() {
// Get the version.
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createVersionRequest());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
m_serverVersion[0] = response[message::VersionResponse::MAJOR].GetInt();
m_serverVersion[1] = response[message::VersionResponse::MINOR].GetInt();
m_serverVersion[2] = response[message::VersionResponse::REVISION].GetInt();
}
void Server::initStatus() {
// Get the status port.
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createStreamStatusRequest());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
// Check response.
if (value == -1) {
return;
}
// Get the status port.
m_statusPort = value;
}
int Server::getStreamPort(const std::string& name) {
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createOutputPortRequest(name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return response[message::RequestResponse::VALUE].GetInt();
}
std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::string& name) {
int port = getStreamPort(name);
if (port == -1) {
return nullptr;
}
// We define a unique name that depends on the event stream socket object because there can be many (instances).
std::string cancelEndpoint = "inproc://cancel." + std::to_string(CancelIdGenerator::newId());
// Create the sockets.
zmq::socket_t * cancelPublisher = m_contextImpl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_contextImpl->createOutputStreamSubscriber(m_serverEndpoint.withPort(port).toString(), cancelEndpoint);
// Wait for the connection to be ready.
m_contextImpl->waitForStreamSubscriber(subscriber, m_requestSocket.get(), name);
// Create the output stream socket.
return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
std::unique_ptr<RequestSocketImpl> Server::createRequestSocket(const std::string& endpoint) {
return std::unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_contextImpl.get(), endpoint, m_contextImpl->getTimeout()));
}
std::unique_ptr<RequestSocketImpl> Server::createRequestSocket(const std::string& endpoint, int timeout) {
return std::unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_contextImpl.get(), endpoint, timeout));
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
os << "server@" << server.m_serverEndpoint.toString();
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "Services.h"
#include "impl/CancelIdGenerator.h"
#include "impl/RequestSocketImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/ContextImpl.h"
#include "JSON.h"
#include "Messages.h"
#include <iostream>
#include <sstream>
#include <stdexcept>
namespace cameo {
Services::Services() :
m_statusPort(0),
m_impl(nullptr) {
m_serverVersion[0] = 0;
m_serverVersion[1] = 0;
m_serverVersion[2] = 0;
}
Services::~Services() {
// Delete impl here to avoid order troubles.
terminate();
}
void Services::terminate() {
// Reset the request socket before the impl, otherwise reset impl will block.
m_requestSocket.reset();
// Reset the impl.
m_impl.reset();
}
void Services::init() {
// Set the impl.
m_impl.reset(new ContextImpl());
}
void Services::initRequestSocket() {
// Create the request socket. The server endpoint must have been initialized.
m_requestSocket = std::move(createRequestSocket(m_serverEndpoint.toString(), m_impl->getTimeout()));
}
std::vector<std::string> Services::split(const std::string& info) {
std::vector<std::string> result;
int lastIndex = 0;
int index = info.find(':');
while (index != std::string::npos) {
result.push_back(info.substr(lastIndex, index - lastIndex));
lastIndex = index + 1;
index = info.find(':', lastIndex);
}
result.push_back(info.substr(lastIndex));
return result;
}
void Services::setTimeout(int timeout) {
m_impl->setTimeout(timeout);
if (m_requestSocket.get() != nullptr) {
m_requestSocket->setTimeout(timeout);
}
}
int Services::getTimeout() const {
return m_impl->getTimeout();
}
const Endpoint& Services::getEndpoint() const {
return m_serverEndpoint;
}
std::array<int, 3> Services::getVersion() const {
return m_serverVersion;
}
Endpoint Services::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
bool Services::isAvailable(int timeout) const {
return m_impl->isAvailable(m_requestSocket.get(), timeout);
}
void Services::retrieveServerVersion() {
// Get the version.
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createVersionRequest());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
m_serverVersion[0] = response[message::VersionResponse::MAJOR].GetInt();
m_serverVersion[1] = response[message::VersionResponse::MINOR].GetInt();
m_serverVersion[2] = response[message::VersionResponse::REVISION].GetInt();
}
void Services::initStatus() {
// Get the status port.
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createStreamStatusRequest());
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
int value = response[message::RequestResponse::VALUE].GetInt();
// Check response.
if (value == -1) {
return;
}
// Get the status port.
m_statusPort = value;
}
std::unique_ptr<EventStreamSocket> Services::openEventStream() {
// Init the status port if necessary.
if (m_statusPort == 0) {
initStatus();
}
std::stringstream cancelEndpoint;
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// Create the sockets.
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint.str());
zmq::socket_t * subscriber = m_impl->createEventSubscriber(getStatusEndpoint().toString(), cancelEndpoint.str());
// Wait for the connection to be ready.
m_impl->waitForSubscriber(subscriber, m_requestSocket.get());
// Create the event stream socket.
return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
}
int Services::getStreamPort(const std::string& name) {
std::unique_ptr<zmq::message_t> reply = m_requestSocket->request(createOutputPortRequest(name));
// Get the JSON response.
json::Object response;
json::parse(response, reply.get());
return response[message::RequestResponse::VALUE].GetInt();
}