diff --git a/.gitignore b/.gitignore index 6cb957a33880c164f4122f55e5a490e2255e4532..77ba40de11ffbccf79b58d945f5ddfbfe546ec4d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /INSTALL /Makefile.in /.settings/ +/Debug/ diff --git a/src/Makefile.am b/src/Makefile.am index 1b3f28f7193d14c02e9acf8f53732c5d4df50e20..c1c671b3f9b6c1bc5a8b73c44c2802d7089e0757 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -71,6 +71,8 @@ nobase_include_HEADERS = \ cameo/ProtoType.h \ cameo/ConnectionTimeout.h \ cameo/Event.h \ + cameo/ConcurrentQueue.h \ + cameo/EventListener.h \ cameo/EventStreamSocket.h \ cameo/OutputStreamSocket.h \ cameo/PublisherCreationException.h \ diff --git a/src/cameo/Application.cpp b/src/cameo/Application.cpp index 79616d60279276ab220edc346033aa8570c59c94..dbdedc72bb339d4a33b274f8783ff3d0c19260ec 100644 --- a/src/cameo/Application.cpp +++ b/src/cameo/Application.cpp @@ -447,31 +447,26 @@ void This::handleStopImpl(StopFunctionType function) { /////////////////////////////////////////////////////////////////////////////// // Instance -Instance::Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket) : +Instance::Instance(Server * server) : m_server(server), - m_eventSocket(std::move(socket)), m_id(-1), m_pastStates(0), m_initialState(UNKNOWN), m_lastState(UNKNOWN), m_hasResult(false) { - - // Create the waiting. - m_waiting.reset(m_eventSocket->waiting()); } Instance::~Instance() { - // the destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called. + // Unregister the instance. + m_server->unregisterEventListener(this); + + // The destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called. } void Instance::setId(int id) { m_id = id; } -void Instance::setName(const std::string& name) { - m_name = name; -} - void Instance::setErrorMessage(const std::string& message) { m_errorMessage = message; } @@ -491,10 +486,6 @@ void Instance::setInitialState(State state) { m_lastState = state; } -const std::string& Instance::getName() const { - return m_name; -} - int Instance::getId() const { return m_id; } @@ -574,7 +565,7 @@ State Instance::waitFor(int states, const std::string& eventName, StateHandlerTy while (true) { // Waits for a new incoming status - unique_ptr<Event> event = m_eventSocket->receive(blocking); + unique_ptr<Event> event = popEvent(blocking); // The socket is canceled or the non-blocking call returns a null message. if (event.get() == nullptr) { @@ -644,7 +635,7 @@ State Instance::waitFor(StateHandlerType handler) { } void Instance::cancelWaitFor() { - m_waiting->cancel(); + cancel(m_id); } State Instance::now() { diff --git a/src/cameo/Application.h b/src/cameo/Application.h index 6110bac4278509c2c81feff440eb57bce19996a8..2fc9782e216670314bfa20676c1bbca86d07cad0 100644 --- a/src/cameo/Application.h +++ b/src/cameo/Application.h @@ -32,6 +32,7 @@ #include "Serializer.h" #include "Services.h" #include "TimeCondition.h" +#include "EventListener.h" namespace cameo { @@ -181,7 +182,7 @@ private: }; -class Instance { +class Instance : public EventListener { friend class cameo::Server; friend class cameo::application::Subscriber; @@ -192,7 +193,6 @@ public: ~Instance(); - const std::string& getName() const; int getId() const; const std::string& getUrl() const; const std::string& getEndpoint() const; @@ -235,21 +235,17 @@ public: std::shared_ptr<OutputStreamSocket> getOutputStreamSocket(); private: - Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket); + Instance(Server * server); void setId(int id); - void setName(const std::string& name); void setErrorMessage(const std::string& message); void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket); void setPastStates(State pastStates); void setInitialState(State state); State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking); - const Server * m_server; - std::unique_ptr<EventStreamSocket> m_eventSocket; + Server * m_server; std::shared_ptr<OutputStreamSocket> m_outputStreamSocket; - std::unique_ptr<WaitingImpl> m_waiting; - std::string m_name; int m_id; std::string m_errorMessage; int m_pastStates; diff --git a/src/cameo/EventListener.cpp b/src/cameo/EventListener.cpp index ed4a77b964797be2a1b37799f90756d7e83a7f24..974bb2aa5e83d5dc3470da4e33389e80b55fcfc3 100644 --- a/src/cameo/EventListener.cpp +++ b/src/cameo/EventListener.cpp @@ -39,6 +39,7 @@ void EventListener::pushEvent(std::unique_ptr<Event>& event) { } std::unique_ptr<Event> EventListener::popEvent(bool blocking) { + if (blocking) { return m_eventQueue.pop(); } diff --git a/src/cameo/EventThread.cpp b/src/cameo/EventThread.cpp index 06edc9887ff687bc111755af7f7d8c750ebd04e4..36aaed389da24ba91ba6a3607e10e42712a1061e 100644 --- a/src/cameo/EventThread.cpp +++ b/src/cameo/EventThread.cpp @@ -17,10 +17,9 @@ #include "EventThread.h" #include "Server.h" #include "EventStreamSocket.h" -#include "StatusEvent.h" -#include "ResultEvent.h" -#include "PublisherEvent.h" -#include "PortEvent.h" +#include "EventListener.h" + +using namespace std; namespace cameo { @@ -30,29 +29,40 @@ EventThread::EventThread(Server * server, std::unique_ptr<EventStreamSocket>& so } EventThread::~EventThread() { -} -void EventThread::start() { -} - -void EventThread::cancel() { - m_socket->cancel(); + if (m_thread != nullptr) { + m_thread->join(); + } } -void EventThread::processStatusEvent(StatusEvent * status) { +void EventThread::start() { -} + m_thread.reset(new thread([this] { -void EventThread::processResultEvent(ResultEvent * result) { + while (true) { + unique_ptr<Event> event = m_socket->receive(); -} + if (event.get() == nullptr) { + // The stream is canceled. + return; + } -void EventThread::processPublisherEvent(PublisherEvent * publisher) { + // Forward the event to the listeners. + auto eventListeners = m_server->getEventListeners(); + for (EventListener * listener : eventListeners) { + // If the application name is null, all the status are pushed, otherwise, filter on the name. + if (listener->getName() == "" + || listener->getName() == event->getName()) { + listener->pushEvent(event); + } + } + } + })); } -void EventThread::processPortEvent(PortEvent * port) { - +void EventThread::cancel() { + m_socket->cancel(); } } diff --git a/src/cameo/EventThread.h b/src/cameo/EventThread.h index fb28a5fd2fd5332f1845561e14570cf7b535512d..ab6bf6e1da62a3ff4743b66d1ccf2117baee7c38 100644 --- a/src/cameo/EventThread.h +++ b/src/cameo/EventThread.h @@ -25,10 +25,6 @@ namespace cameo { class Server; class EventStreamSocket; -class StatusEvent; -class ResultEvent; -class PublisherEvent; -class PortEvent; class EventThread { @@ -40,15 +36,9 @@ public: void cancel(); private: - void processStatusEvent(StatusEvent * status); - void processResultEvent(ResultEvent * result); - void processPublisherEvent(PublisherEvent * publisher); - void processPortEvent(PortEvent * port); - Server * m_server; std::unique_ptr<EventStreamSocket> m_socket; std::unique_ptr<std::thread> m_thread; - }; } diff --git a/src/cameo/Server.cpp b/src/cameo/Server.cpp index 05e8f8939a15ea3de96658d49203afd5db5cdcb8..6f143e3b5c536626603b223cbadab025b7edf367 100644 --- a/src/cameo/Server.cpp +++ b/src/cameo/Server.cpp @@ -23,6 +23,7 @@ #include "impl/ServicesImpl.h" #include "impl/SocketImpl.h" #include "ProtoType.h" +#include "EventThread.h" using namespace std; @@ -44,9 +45,18 @@ Server::Server(const std::string& endpoint) : istringstream is(port); is >> m_port; m_serverEndpoint = m_url + ":" + port; + + // Start the event thread. + unique_ptr<EventStreamSocket> socket = openEventStream(); + m_eventThread.reset(new EventThread(this, socket)); + m_eventThread->start(); } Server::~Server() { + // Stop the event thread. + if (m_eventThread.get() != nullptr) { + m_eventThread->cancel(); + } } void Server::setTimeout(int timeoutMs) { @@ -88,8 +98,7 @@ int Server::getAvailableTimeout() const { } std::unique_ptr<application::Instance> Server::makeInstance() { - unique_ptr<EventStreamSocket> socket = Services::openEventStream(); - return unique_ptr<application::Instance>(new application::Instance(this, socket)); + return unique_ptr<application::Instance>(new application::Instance(this)); } std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) { @@ -115,7 +124,9 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co unique_ptr<application::Instance> instance = makeInstance(); + // Set the name and register the instance as event listener. instance->setName(name); + registerEventListener(instance.get()); try { if (outputStream) { @@ -167,29 +178,6 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const { return Response(requestResponse.value(), requestResponse.message()); } -std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) { - - unique_ptr<application::Instance> instance = makeInstance(); - - try { - Response response = stopApplicationAsynchronously(id, immediately); - - if (response.getValue() != -1) { - // we get the name in the message attribute - instance->setName(response.getMessage()); - instance->setId(id); - - } else { - instance->setErrorMessage(response.getMessage()); - } - - } catch (const ConnectionTimeout& e) { - instance->setErrorMessage(e.what()); - } - - return instance; -} - application::InstanceArray Server::connectAll(const std::string& name, Option options) { bool outputStream = ((options & OUTPUTSTREAM) != 0); @@ -213,7 +201,10 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op unique_ptr<application::Instance> instance = makeInstance(); + // Set the name and register the instance as event listener. instance->setName(info.name()); + registerEventListener(instance.get()); + int applicationId = info.id(); // test if the application is still alive otherwise we could have missed a status message diff --git a/src/cameo/Server.h b/src/cameo/Server.h index 0aa634246b781dc37b87f667ea026eab7457521e..a5f347392525b49d5d182809a5bef8402e91b332 100644 --- a/src/cameo/Server.h +++ b/src/cameo/Server.h @@ -34,6 +34,7 @@ namespace application { } class EventListener; +class EventThread; class Server : private Services { @@ -117,13 +118,13 @@ private: std::unique_ptr<application::Instance> makeInstance(); bool isAlive(int id) const; Response stopApplicationAsynchronously(int id, bool immediately) const; - std::unique_ptr<application::Instance> stop(int id, bool immediately); std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const; int getAvailableTimeout() const; int getStreamPort(const std::string& name); std::mutex m_eventListenersMutex; std::vector<EventListener *> m_eventListeners; + std::unique_ptr<EventThread> m_eventThread; }; std::ostream& operator<<(std::ostream&, const Server&);