Commit edf172e4 authored by legoc's avatar legoc
Browse files

Implemented C++ Instance with EventListener to avoid creating a new...

Implemented C++ Instance with EventListener to avoid creating a new EventStreamSocket for each instance
parent 2c614eef
...@@ -6,3 +6,4 @@ ...@@ -6,3 +6,4 @@
/INSTALL /INSTALL
/Makefile.in /Makefile.in
/.settings/ /.settings/
/Debug/
...@@ -71,6 +71,8 @@ nobase_include_HEADERS = \ ...@@ -71,6 +71,8 @@ nobase_include_HEADERS = \
cameo/ProtoType.h \ cameo/ProtoType.h \
cameo/ConnectionTimeout.h \ cameo/ConnectionTimeout.h \
cameo/Event.h \ cameo/Event.h \
cameo/ConcurrentQueue.h \
cameo/EventListener.h \
cameo/EventStreamSocket.h \ cameo/EventStreamSocket.h \
cameo/OutputStreamSocket.h \ cameo/OutputStreamSocket.h \
cameo/PublisherCreationException.h \ cameo/PublisherCreationException.h \
......
...@@ -447,31 +447,26 @@ void This::handleStopImpl(StopFunctionType function) { ...@@ -447,31 +447,26 @@ void This::handleStopImpl(StopFunctionType function) {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// Instance // Instance
Instance::Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket) : Instance::Instance(Server * server) :
m_server(server), m_server(server),
m_eventSocket(std::move(socket)),
m_id(-1), m_id(-1),
m_pastStates(0), m_pastStates(0),
m_initialState(UNKNOWN), m_initialState(UNKNOWN),
m_lastState(UNKNOWN), m_lastState(UNKNOWN),
m_hasResult(false) { m_hasResult(false) {
// Create the waiting.
m_waiting.reset(m_eventSocket->waiting());
} }
Instance::~Instance() { Instance::~Instance() {
// the destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called. // Unregister the instance.
m_server->unregisterEventListener(this);
// The destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called.
} }
void Instance::setId(int id) { void Instance::setId(int id) {
m_id = id; m_id = id;
} }
void Instance::setName(const std::string& name) {
m_name = name;
}
void Instance::setErrorMessage(const std::string& message) { void Instance::setErrorMessage(const std::string& message) {
m_errorMessage = message; m_errorMessage = message;
} }
...@@ -491,10 +486,6 @@ void Instance::setInitialState(State state) { ...@@ -491,10 +486,6 @@ void Instance::setInitialState(State state) {
m_lastState = state; m_lastState = state;
} }
const std::string& Instance::getName() const {
return m_name;
}
int Instance::getId() const { int Instance::getId() const {
return m_id; return m_id;
} }
...@@ -574,7 +565,7 @@ State Instance::waitFor(int states, const std::string& eventName, StateHandlerTy ...@@ -574,7 +565,7 @@ State Instance::waitFor(int states, const std::string& eventName, StateHandlerTy
while (true) { while (true) {
// Waits for a new incoming status // Waits for a new incoming status
unique_ptr<Event> event = m_eventSocket->receive(blocking); unique_ptr<Event> event = popEvent(blocking);
// The socket is canceled or the non-blocking call returns a null message. // The socket is canceled or the non-blocking call returns a null message.
if (event.get() == nullptr) { if (event.get() == nullptr) {
...@@ -644,7 +635,7 @@ State Instance::waitFor(StateHandlerType handler) { ...@@ -644,7 +635,7 @@ State Instance::waitFor(StateHandlerType handler) {
} }
void Instance::cancelWaitFor() { void Instance::cancelWaitFor() {
m_waiting->cancel(); cancel(m_id);
} }
State Instance::now() { State Instance::now() {
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "Serializer.h" #include "Serializer.h"
#include "Services.h" #include "Services.h"
#include "TimeCondition.h" #include "TimeCondition.h"
#include "EventListener.h"
namespace cameo { namespace cameo {
...@@ -181,7 +182,7 @@ private: ...@@ -181,7 +182,7 @@ private:
}; };
class Instance { class Instance : public EventListener {
friend class cameo::Server; friend class cameo::Server;
friend class cameo::application::Subscriber; friend class cameo::application::Subscriber;
...@@ -192,7 +193,6 @@ public: ...@@ -192,7 +193,6 @@ public:
~Instance(); ~Instance();
const std::string& getName() const;
int getId() const; int getId() const;
const std::string& getUrl() const; const std::string& getUrl() const;
const std::string& getEndpoint() const; const std::string& getEndpoint() const;
...@@ -235,21 +235,17 @@ public: ...@@ -235,21 +235,17 @@ public:
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket(); std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
private: private:
Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket); Instance(Server * server);
void setId(int id); void setId(int id);
void setName(const std::string& name);
void setErrorMessage(const std::string& message); void setErrorMessage(const std::string& message);
void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket); void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
void setPastStates(State pastStates); void setPastStates(State pastStates);
void setInitialState(State state); void setInitialState(State state);
State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking); State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking);
const Server * m_server; Server * m_server;
std::unique_ptr<EventStreamSocket> m_eventSocket;
std::shared_ptr<OutputStreamSocket> m_outputStreamSocket; std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
std::unique_ptr<WaitingImpl> m_waiting;
std::string m_name;
int m_id; int m_id;
std::string m_errorMessage; std::string m_errorMessage;
int m_pastStates; int m_pastStates;
......
...@@ -39,6 +39,7 @@ void EventListener::pushEvent(std::unique_ptr<Event>& event) { ...@@ -39,6 +39,7 @@ void EventListener::pushEvent(std::unique_ptr<Event>& event) {
} }
std::unique_ptr<Event> EventListener::popEvent(bool blocking) { std::unique_ptr<Event> EventListener::popEvent(bool blocking) {
if (blocking) { if (blocking) {
return m_eventQueue.pop(); return m_eventQueue.pop();
} }
......
...@@ -17,10 +17,9 @@ ...@@ -17,10 +17,9 @@
#include "EventThread.h" #include "EventThread.h"
#include "Server.h" #include "Server.h"
#include "EventStreamSocket.h" #include "EventStreamSocket.h"
#include "StatusEvent.h" #include "EventListener.h"
#include "ResultEvent.h"
#include "PublisherEvent.h" using namespace std;
#include "PortEvent.h"
namespace cameo { namespace cameo {
...@@ -30,29 +29,40 @@ EventThread::EventThread(Server * server, std::unique_ptr<EventStreamSocket>& so ...@@ -30,29 +29,40 @@ EventThread::EventThread(Server * server, std::unique_ptr<EventStreamSocket>& so
} }
EventThread::~EventThread() { EventThread::~EventThread() {
}
void EventThread::start() { if (m_thread != nullptr) {
} m_thread->join();
}
void EventThread::cancel() {
m_socket->cancel();
} }
void EventThread::processStatusEvent(StatusEvent * status) { void EventThread::start() {
} m_thread.reset(new thread([this] {
void EventThread::processResultEvent(ResultEvent * result) { while (true) {
unique_ptr<Event> event = m_socket->receive();
} if (event.get() == nullptr) {
// The stream is canceled.
return;
}
void EventThread::processPublisherEvent(PublisherEvent * publisher) { // Forward the event to the listeners.
auto eventListeners = m_server->getEventListeners();
for (EventListener * listener : eventListeners) {
// If the application name is null, all the status are pushed, otherwise, filter on the name.
if (listener->getName() == ""
|| listener->getName() == event->getName()) {
listener->pushEvent(event);
}
}
}
}));
} }
void EventThread::processPortEvent(PortEvent * port) { void EventThread::cancel() {
m_socket->cancel();
} }
} }
...@@ -25,10 +25,6 @@ namespace cameo { ...@@ -25,10 +25,6 @@ namespace cameo {
class Server; class Server;
class EventStreamSocket; class EventStreamSocket;
class StatusEvent;
class ResultEvent;
class PublisherEvent;
class PortEvent;
class EventThread { class EventThread {
...@@ -40,15 +36,9 @@ public: ...@@ -40,15 +36,9 @@ public:
void cancel(); void cancel();
private: private:
void processStatusEvent(StatusEvent * status);
void processResultEvent(ResultEvent * result);
void processPublisherEvent(PublisherEvent * publisher);
void processPortEvent(PortEvent * port);
Server * m_server; Server * m_server;
std::unique_ptr<EventStreamSocket> m_socket; std::unique_ptr<EventStreamSocket> m_socket;
std::unique_ptr<std::thread> m_thread; std::unique_ptr<std::thread> m_thread;
}; };
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "impl/ServicesImpl.h" #include "impl/ServicesImpl.h"
#include "impl/SocketImpl.h" #include "impl/SocketImpl.h"
#include "ProtoType.h" #include "ProtoType.h"
#include "EventThread.h"
using namespace std; using namespace std;
...@@ -44,9 +45,18 @@ Server::Server(const std::string& endpoint) : ...@@ -44,9 +45,18 @@ Server::Server(const std::string& endpoint) :
istringstream is(port); istringstream is(port);
is >> m_port; is >> m_port;
m_serverEndpoint = m_url + ":" + port; m_serverEndpoint = m_url + ":" + port;
// Start the event thread.
unique_ptr<EventStreamSocket> socket = openEventStream();
m_eventThread.reset(new EventThread(this, socket));
m_eventThread->start();
} }
Server::~Server() { Server::~Server() {
// Stop the event thread.
if (m_eventThread.get() != nullptr) {
m_eventThread->cancel();
}
} }
void Server::setTimeout(int timeoutMs) { void Server::setTimeout(int timeoutMs) {
...@@ -88,8 +98,7 @@ int Server::getAvailableTimeout() const { ...@@ -88,8 +98,7 @@ int Server::getAvailableTimeout() const {
} }
std::unique_ptr<application::Instance> Server::makeInstance() { std::unique_ptr<application::Instance> Server::makeInstance() {
unique_ptr<EventStreamSocket> socket = Services::openEventStream(); return unique_ptr<application::Instance>(new application::Instance(this));
return unique_ptr<application::Instance>(new application::Instance(this, socket));
} }
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) { std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
...@@ -115,7 +124,9 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co ...@@ -115,7 +124,9 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
unique_ptr<application::Instance> instance = makeInstance(); unique_ptr<application::Instance> instance = makeInstance();
// Set the name and register the instance as event listener.
instance->setName(name); instance->setName(name);
registerEventListener(instance.get());
try { try {
if (outputStream) { if (outputStream) {
...@@ -167,29 +178,6 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const { ...@@ -167,29 +178,6 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
return Response(requestResponse.value(), requestResponse.message()); return Response(requestResponse.value(), requestResponse.message());
} }
std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) {
unique_ptr<application::Instance> instance = makeInstance();
try {
Response response = stopApplicationAsynchronously(id, immediately);
if (response.getValue() != -1) {
// we get the name in the message attribute
instance->setName(response.getMessage());
instance->setId(id);
} else {
instance->setErrorMessage(response.getMessage());
}
} catch (const ConnectionTimeout& e) {
instance->setErrorMessage(e.what());
}
return instance;
}
application::InstanceArray Server::connectAll(const std::string& name, Option options) { application::InstanceArray Server::connectAll(const std::string& name, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
...@@ -213,7 +201,10 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op ...@@ -213,7 +201,10 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
unique_ptr<application::Instance> instance = makeInstance(); unique_ptr<application::Instance> instance = makeInstance();
// Set the name and register the instance as event listener.
instance->setName(info.name()); instance->setName(info.name());
registerEventListener(instance.get());
int applicationId = info.id(); int applicationId = info.id();
// test if the application is still alive otherwise we could have missed a status message // test if the application is still alive otherwise we could have missed a status message
......
...@@ -34,6 +34,7 @@ namespace application { ...@@ -34,6 +34,7 @@ namespace application {
} }
class EventListener; class EventListener;
class EventThread;
class Server : private Services { class Server : private Services {
...@@ -117,13 +118,13 @@ private: ...@@ -117,13 +118,13 @@ private:
std::unique_ptr<application::Instance> makeInstance(); std::unique_ptr<application::Instance> makeInstance();
bool isAlive(int id) const; bool isAlive(int id) const;
Response stopApplicationAsynchronously(int id, bool immediately) const; Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Instance> stop(int id, bool immediately);
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const; std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const; int getAvailableTimeout() const;
int getStreamPort(const std::string& name); int getStreamPort(const std::string& name);
std::mutex m_eventListenersMutex; std::mutex m_eventListenersMutex;
std::vector<EventListener *> m_eventListeners; std::vector<EventListener *> m_eventListeners;
std::unique_ptr<EventThread> m_eventThread;
}; };
std::ostream& operator<<(std::ostream&, const Server&); std::ostream& operator<<(std::ostream&, const Server&);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment