Commit 17aa1e49 authored by legoc's avatar legoc
Browse files

Implemented C++ Instance with EventListener to avoid creating a new...

Implemented C++ Instance with EventListener to avoid creating a new EventStreamSocket for each instance
parent dd232fca
......@@ -6,3 +6,4 @@
/INSTALL
/Makefile.in
/.settings/
/Debug/
......@@ -71,6 +71,8 @@ nobase_include_HEADERS = \
cameo/ProtoType.h \
cameo/ConnectionTimeout.h \
cameo/Event.h \
cameo/ConcurrentQueue.h \
cameo/EventListener.h \
cameo/EventStreamSocket.h \
cameo/OutputStreamSocket.h \
cameo/PublisherCreationException.h \
......
......@@ -447,31 +447,26 @@ void This::handleStopImpl(StopFunctionType function) {
///////////////////////////////////////////////////////////////////////////////
// Instance
Instance::Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket) :
Instance::Instance(Server * server) :
m_server(server),
m_eventSocket(std::move(socket)),
m_id(-1),
m_pastStates(0),
m_initialState(UNKNOWN),
m_lastState(UNKNOWN),
m_hasResult(false) {
// Create the waiting.
m_waiting.reset(m_eventSocket->waiting());
}
Instance::~Instance() {
// the destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called.
// Unregister the instance.
m_server->unregisterEventListener(this);
// The destructor has been added to avoid blocking ZeroMQ, because the inner objects destructors were not called.
}
void Instance::setId(int id) {
m_id = id;
}
void Instance::setName(const std::string& name) {
m_name = name;
}
void Instance::setErrorMessage(const std::string& message) {
m_errorMessage = message;
}
......@@ -491,10 +486,6 @@ void Instance::setInitialState(State state) {
m_lastState = state;
}
const std::string& Instance::getName() const {
return m_name;
}
int Instance::getId() const {
return m_id;
}
......@@ -574,7 +565,7 @@ State Instance::waitFor(int states, const std::string& eventName, StateHandlerTy
while (true) {
// Waits for a new incoming status
unique_ptr<Event> event = m_eventSocket->receive(blocking);
unique_ptr<Event> event = popEvent(blocking);
// The socket is canceled or the non-blocking call returns a null message.
if (event.get() == nullptr) {
......@@ -644,7 +635,7 @@ State Instance::waitFor(StateHandlerType handler) {
}
void Instance::cancelWaitFor() {
m_waiting->cancel();
cancel(m_id);
}
State Instance::now() {
......
......@@ -32,6 +32,7 @@
#include "Serializer.h"
#include "Services.h"
#include "TimeCondition.h"
#include "EventListener.h"
namespace cameo {
......@@ -181,7 +182,7 @@ private:
};
class Instance {
class Instance : public EventListener {
friend class cameo::Server;
friend class cameo::application::Subscriber;
......@@ -192,7 +193,6 @@ public:
~Instance();
const std::string& getName() const;
int getId() const;
const std::string& getUrl() const;
const std::string& getEndpoint() const;
......@@ -235,21 +235,17 @@ public:
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
private:
Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket);
Instance(Server * server);
void setId(int id);
void setName(const std::string& name);
void setErrorMessage(const std::string& message);
void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
void setPastStates(State pastStates);
void setInitialState(State state);
State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking);
const Server * m_server;
std::unique_ptr<EventStreamSocket> m_eventSocket;
Server * m_server;
std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
std::unique_ptr<WaitingImpl> m_waiting;
std::string m_name;
int m_id;
std::string m_errorMessage;
int m_pastStates;
......
......@@ -39,6 +39,7 @@ void EventListener::pushEvent(std::unique_ptr<Event>& event) {
}
std::unique_ptr<Event> EventListener::popEvent(bool blocking) {
if (blocking) {
return m_eventQueue.pop();
}
......
......@@ -17,10 +17,9 @@
#include "EventThread.h"
#include "Server.h"
#include "EventStreamSocket.h"
#include "StatusEvent.h"
#include "ResultEvent.h"
#include "PublisherEvent.h"
#include "PortEvent.h"
#include "EventListener.h"
using namespace std;
namespace cameo {
......@@ -30,29 +29,40 @@ EventThread::EventThread(Server * server, std::unique_ptr<EventStreamSocket>& so
}
EventThread::~EventThread() {
}
void EventThread::start() {
}
void EventThread::cancel() {
m_socket->cancel();
if (m_thread != nullptr) {
m_thread->join();
}
}
void EventThread::processStatusEvent(StatusEvent * status) {
void EventThread::start() {
}
m_thread.reset(new thread([this] {
void EventThread::processResultEvent(ResultEvent * result) {
while (true) {
unique_ptr<Event> event = m_socket->receive();
}
if (event.get() == nullptr) {
// The stream is canceled.
return;
}
void EventThread::processPublisherEvent(PublisherEvent * publisher) {
// Forward the event to the listeners.
auto eventListeners = m_server->getEventListeners();
for (EventListener * listener : eventListeners) {
// If the application name is null, all the status are pushed, otherwise, filter on the name.
if (listener->getName() == ""
|| listener->getName() == event->getName()) {
listener->pushEvent(event);
}
}
}
}));
}
void EventThread::processPortEvent(PortEvent * port) {
void EventThread::cancel() {
m_socket->cancel();
}
}
......@@ -25,10 +25,6 @@ namespace cameo {
class Server;
class EventStreamSocket;
class StatusEvent;
class ResultEvent;
class PublisherEvent;
class PortEvent;
class EventThread {
......@@ -40,15 +36,9 @@ public:
void cancel();
private:
void processStatusEvent(StatusEvent * status);
void processResultEvent(ResultEvent * result);
void processPublisherEvent(PublisherEvent * publisher);
void processPortEvent(PortEvent * port);
Server * m_server;
std::unique_ptr<EventStreamSocket> m_socket;
std::unique_ptr<std::thread> m_thread;
};
}
......
......@@ -23,6 +23,7 @@
#include "impl/ServicesImpl.h"
#include "impl/SocketImpl.h"
#include "ProtoType.h"
#include "EventThread.h"
using namespace std;
......@@ -44,9 +45,18 @@ Server::Server(const std::string& endpoint) :
istringstream is(port);
is >> m_port;
m_serverEndpoint = m_url + ":" + port;
// Start the event thread.
unique_ptr<EventStreamSocket> socket = openEventStream();
m_eventThread.reset(new EventThread(this, socket));
m_eventThread->start();
}
Server::~Server() {
// Stop the event thread.
if (m_eventThread.get() != nullptr) {
m_eventThread->cancel();
}
}
void Server::setTimeout(int timeoutMs) {
......@@ -88,8 +98,7 @@ int Server::getAvailableTimeout() const {
}
std::unique_ptr<application::Instance> Server::makeInstance() {
unique_ptr<EventStreamSocket> socket = Services::openEventStream();
return unique_ptr<application::Instance>(new application::Instance(this, socket));
return unique_ptr<application::Instance>(new application::Instance(this));
}
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
......@@ -115,7 +124,9 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, co
unique_ptr<application::Instance> instance = makeInstance();
// Set the name and register the instance as event listener.
instance->setName(name);
registerEventListener(instance.get());
try {
if (outputStream) {
......@@ -167,29 +178,6 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
return Response(requestResponse.value(), requestResponse.message());
}
std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) {
unique_ptr<application::Instance> instance = makeInstance();
try {
Response response = stopApplicationAsynchronously(id, immediately);
if (response.getValue() != -1) {
// we get the name in the message attribute
instance->setName(response.getMessage());
instance->setId(id);
} else {
instance->setErrorMessage(response.getMessage());
}
} catch (const ConnectionTimeout& e) {
instance->setErrorMessage(e.what());
}
return instance;
}
application::InstanceArray Server::connectAll(const std::string& name, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
......@@ -213,7 +201,10 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
unique_ptr<application::Instance> instance = makeInstance();
// Set the name and register the instance as event listener.
instance->setName(info.name());
registerEventListener(instance.get());
int applicationId = info.id();
// test if the application is still alive otherwise we could have missed a status message
......
......@@ -34,6 +34,7 @@ namespace application {
}
class EventListener;
class EventThread;
class Server : private Services {
......@@ -117,13 +118,13 @@ private:
std::unique_ptr<application::Instance> makeInstance();
bool isAlive(int id) const;
Response stopApplicationAsynchronously(int id, bool immediately) const;
std::unique_ptr<application::Instance> stop(int id, bool immediately);
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
std::mutex m_eventListenersMutex;
std::vector<EventListener *> m_eventListeners;
std::unique_ptr<EventThread> m_eventThread;
};
std::ostream& operator<<(std::ostream&, const Server&);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment