Commit ec09c8f5 authored by legoc's avatar legoc
Browse files

Replaced EventStreamSocket of This with an EventListener.

Corrected bug in EventThread: now events are cloned.
parent 17aa1e49
...@@ -44,7 +44,6 @@ libcameo_la_SOURCES = \ ...@@ -44,7 +44,6 @@ libcameo_la_SOURCES = \
cameo/impl/ServicesImpl.cpp \ cameo/impl/ServicesImpl.cpp \
cameo/Server.cpp \ cameo/Server.cpp \
cameo/impl/HandlerImpl.cpp \ cameo/impl/HandlerImpl.cpp \
cameo/impl/ApplicationImpl.cpp \
cameo/Application.cpp \ cameo/Application.cpp \
cameo/impl/SocketImpl.h \ cameo/impl/SocketImpl.h \
cameo/impl/WaitingImpl.h \ cameo/impl/WaitingImpl.h \
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
#include "EventStreamSocket.h" #include "EventStreamSocket.h"
#include "impl/ApplicationImpl.h" #include "impl/ServicesImpl.h"
#include "impl/PublisherImpl.h" #include "impl/PublisherImpl.h"
#include "impl/RequesterImpl.h" #include "impl/RequesterImpl.h"
#include "impl/RequestImpl.h" #include "impl/RequestImpl.h"
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "impl/SubscriberImpl.h" #include "impl/SubscriberImpl.h"
#include "impl/WaitingImpl.h" #include "impl/WaitingImpl.h"
#include "impl/WaitingImplSet.h" #include "impl/WaitingImplSet.h"
#include "impl/HandlerImpl.h"
#include "PortEvent.h" #include "PortEvent.h"
#include "ProtoType.h" #include "ProtoType.h"
#include "PublisherEvent.h" #include "PublisherEvent.h"
...@@ -119,7 +120,7 @@ This::This() : ...@@ -119,7 +120,7 @@ This::This() :
void This::initApplication(int argc, char *argv[]) { void This::initApplication(int argc, char *argv[]) {
m_impl = new ApplicationImpl(); m_impl = new ServicesImpl();
Services::setImpl(m_impl); Services::setImpl(m_impl);
if (argc == 0) { if (argc == 0) {
...@@ -193,6 +194,10 @@ void This::initApplication(int argc, char *argv[]) { ...@@ -193,6 +194,10 @@ void This::initApplication(int argc, char *argv[]) {
} }
m_waitingSet = unique_ptr<WaitingImplSet>(new WaitingImplSet()); m_waitingSet = unique_ptr<WaitingImplSet>(new WaitingImplSet());
// Init listener.
setName(m_name);
m_server->registerEventListener(this);
} }
This::~This() { This::~This() {
...@@ -385,10 +390,6 @@ bool This::removePort(const std::string& name) const { ...@@ -385,10 +390,6 @@ bool This::removePort(const std::string& name) const {
State This::waitForStop() { State This::waitForStop() {
// open the event stream
unique_ptr<EventStreamSocket> socket = openEventStream();
m_impl->setEventSocket(socket);
// test if stop was requested elsewhere // test if stop was requested elsewhere
State state = getState(m_id); State state = getState(m_id);
if (state == STOPPING if (state == STOPPING
...@@ -398,7 +399,7 @@ State This::waitForStop() { ...@@ -398,7 +399,7 @@ State This::waitForStop() {
while (true) { while (true) {
// waits for a new incoming status // waits for a new incoming status
unique_ptr<Event> event = m_impl->m_eventSocket->receive(); unique_ptr<Event> event = popEvent();
// The socket is canceled. // The socket is canceled.
if (event.get() == nullptr) { if (event.get() == nullptr) {
...@@ -440,8 +441,18 @@ std::unique_ptr<Instance> This::connectToStarter() { ...@@ -440,8 +441,18 @@ std::unique_ptr<Instance> This::connectToStarter() {
return unique_ptr<Instance>(nullptr); return unique_ptr<Instance>(nullptr);
} }
void This::stoppingFunction(StopFunctionType stop) {
application::State state = waitForStop();
// Only stop in case of STOPPING.
if (state == application::STOPPING) {
stop();
}
}
void This::handleStopImpl(StopFunctionType function) { void This::handleStopImpl(StopFunctionType function) {
m_impl->handleStop(&m_instance, function); m_stopHandler = unique_ptr<HandlerImpl>(new HandlerImpl(bind(&This::stoppingFunction, this, function)));
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
......
...@@ -44,7 +44,6 @@ enum Option { ...@@ -44,7 +44,6 @@ enum Option {
class Server; class Server;
class EventStreamSocket; class EventStreamSocket;
class OutputStreamSocket; class OutputStreamSocket;
class ApplicationImpl;
class PublisherImpl; class PublisherImpl;
class SubscriberImpl; class SubscriberImpl;
class RequestImpl; class RequestImpl;
...@@ -54,6 +53,7 @@ class WaitingImpl; ...@@ -54,6 +53,7 @@ class WaitingImpl;
class SocketWaitingImpl; class SocketWaitingImpl;
class GenericWaitingImpl; class GenericWaitingImpl;
class WaitingImplSet; class WaitingImplSet;
class HandlerImpl;
namespace application { namespace application {
...@@ -81,7 +81,7 @@ const State STOPPED = 128; ...@@ -81,7 +81,7 @@ const State STOPPED = 128;
const State KILLED = 256; const State KILLED = 256;
class This : private Services { class This : private Services, private EventListener {
friend class cameo::application::Publisher; friend class cameo::application::Publisher;
friend class cameo::application::Responder; friend class cameo::application::Responder;
...@@ -90,7 +90,6 @@ class This : private Services { ...@@ -90,7 +90,6 @@ class This : private Services {
friend class cameo::RequestImpl; friend class cameo::RequestImpl;
friend class cameo::ResponderImpl; friend class cameo::ResponderImpl;
friend class cameo::RequesterImpl; friend class cameo::RequesterImpl;
friend class cameo::ApplicationImpl;
friend class cameo::SocketWaitingImpl; friend class cameo::SocketWaitingImpl;
friend class cameo::GenericWaitingImpl; friend class cameo::GenericWaitingImpl;
friend class cameo::Server; friend class cameo::Server;
...@@ -161,9 +160,11 @@ private: ...@@ -161,9 +160,11 @@ private:
bool destroyPublisher(const std::string& name) const; bool destroyPublisher(const std::string& name) const;
bool removePort(const std::string& name) const; bool removePort(const std::string& name) const;
State waitForStop(); State waitForStop();
void stoppingFunction(StopFunctionType stop);
void handleStopImpl(StopFunctionType function); void handleStopImpl(StopFunctionType function);
ApplicationImpl * m_impl; ServicesImpl * m_impl;
std::string m_name; std::string m_name;
int m_id; int m_id;
bool m_managed; bool m_managed;
...@@ -176,13 +177,14 @@ private: ...@@ -176,13 +177,14 @@ private:
std::unique_ptr<Server> m_starterServer; std::unique_ptr<Server> m_starterServer;
std::unique_ptr<WaitingImplSet> m_waitingSet; std::unique_ptr<WaitingImplSet> m_waitingSet;
std::unique_ptr<HandlerImpl> m_stopHandler;
static This m_instance; static This m_instance;
static const std::string RUNNING_STATE; static const std::string RUNNING_STATE;
}; };
class Instance : public EventListener { class Instance : private EventListener {
friend class cameo::Server; friend class cameo::Server;
friend class cameo::application::Subscriber; friend class cameo::application::Subscriber;
......
...@@ -24,6 +24,14 @@ CancelEvent::CancelEvent(int id, const std::string& name) : ...@@ -24,6 +24,14 @@ CancelEvent::CancelEvent(int id, const std::string& name) :
Event(id, name) { Event(id, name) {
} }
CancelEvent::CancelEvent(const CancelEvent& event) :
Event(event) {
}
CancelEvent* CancelEvent::clone() {
return new CancelEvent(*this);
}
std::ostream& operator<<(std::ostream& os, const cameo::CancelEvent& event) { std::ostream& operator<<(std::ostream& os, const cameo::CancelEvent& event) {
os << "name=" << event.m_name os << "name=" << event.m_name
<< "\nid=" << event.m_id; << "\nid=" << event.m_id;
......
...@@ -28,6 +28,9 @@ class CancelEvent : public Event { ...@@ -28,6 +28,9 @@ class CancelEvent : public Event {
public: public:
CancelEvent(int id, const std::string& name); CancelEvent(int id, const std::string& name);
CancelEvent(const CancelEvent& event);
virtual CancelEvent* clone();
}; };
std::ostream& operator<<(std::ostream&, const CancelEvent&); std::ostream& operator<<(std::ostream&, const CancelEvent&);
......
...@@ -25,6 +25,11 @@ Event::Event(int id, const std::string& name) : ...@@ -25,6 +25,11 @@ Event::Event(int id, const std::string& name) :
m_name(name) { m_name(name) {
} }
Event::Event(const Event& event) :
m_id(event.m_id),
m_name(event.m_name) {
}
Event::~Event() { Event::~Event() {
} }
...@@ -36,4 +41,4 @@ const std::string& Event::getName() const { ...@@ -36,4 +41,4 @@ const std::string& Event::getName() const {
return m_name; return m_name;
} }
} }
\ No newline at end of file
...@@ -25,8 +25,11 @@ class Event { ...@@ -25,8 +25,11 @@ class Event {
public: public:
Event(int id, const std::string& name); Event(int id, const std::string& name);
Event(const Event& event);
virtual ~Event(); virtual ~Event();
virtual Event* clone() = 0;
int getId() const; int getId() const;
const std::string& getName() const; const std::string& getName() const;
...@@ -37,4 +40,4 @@ protected: ...@@ -37,4 +40,4 @@ protected:
} }
#endif #endif
\ No newline at end of file
...@@ -54,7 +54,12 @@ void EventThread::start() { ...@@ -54,7 +54,12 @@ void EventThread::start() {
// If the application name is null, all the status are pushed, otherwise, filter on the name. // If the application name is null, all the status are pushed, otherwise, filter on the name.
if (listener->getName() == "" if (listener->getName() == ""
|| listener->getName() == event->getName()) { || listener->getName() == event->getName()) {
listener->pushEvent(event);
// Clone the event is necessary because the event is passed to different listeners working in different threads.
unique_ptr<Event> clonedEvent(event->clone());
// Push the cloned event.
listener->pushEvent(clonedEvent);
} }
} }
} }
......
...@@ -25,6 +25,14 @@ PortEvent::PortEvent(int id, const std::string& name, const std::string& portNam ...@@ -25,6 +25,14 @@ PortEvent::PortEvent(int id, const std::string& name, const std::string& portNam
m_portName(portName) { m_portName(portName) {
} }
PortEvent::PortEvent(const PortEvent& event) :
Event(event), m_portName(event.m_portName) {
}
PortEvent* PortEvent::clone() {
return new PortEvent(*this);
}
const std::string& PortEvent::getPortName() const { const std::string& PortEvent::getPortName() const {
return m_portName; return m_portName;
} }
......
...@@ -28,6 +28,9 @@ class PortEvent : public Event { ...@@ -28,6 +28,9 @@ class PortEvent : public Event {
public: public:
PortEvent(int id, const std::string& name, const std::string& portName); PortEvent(int id, const std::string& name, const std::string& portName);
PortEvent(const PortEvent& event);
virtual PortEvent* clone();
const std::string& getPortName() const; const std::string& getPortName() const;
......
...@@ -25,6 +25,14 @@ PublisherEvent::PublisherEvent(int id, const std::string& name, const std::strin ...@@ -25,6 +25,14 @@ PublisherEvent::PublisherEvent(int id, const std::string& name, const std::strin
m_publisherName(publisherName) { m_publisherName(publisherName) {
} }
PublisherEvent::PublisherEvent(const PublisherEvent& event) :
Event(event), m_publisherName(event.m_publisherName) {
}
PublisherEvent* PublisherEvent::clone() {
return new PublisherEvent(*this);
}
const std::string& PublisherEvent::getPublisherName() const { const std::string& PublisherEvent::getPublisherName() const {
return m_publisherName; return m_publisherName;
} }
......
...@@ -28,6 +28,9 @@ class PublisherEvent : public Event { ...@@ -28,6 +28,9 @@ class PublisherEvent : public Event {
public: public:
PublisherEvent(int id, const std::string& name, const std::string& publisherName); PublisherEvent(int id, const std::string& name, const std::string& publisherName);
PublisherEvent(const PublisherEvent& event);
virtual PublisherEvent* clone();
const std::string& getPublisherName() const; const std::string& getPublisherName() const;
......
...@@ -25,6 +25,14 @@ ResultEvent::ResultEvent(int id, const std::string& name, const std::string& dat ...@@ -25,6 +25,14 @@ ResultEvent::ResultEvent(int id, const std::string& name, const std::string& dat
m_data(data) { m_data(data) {
} }
ResultEvent::ResultEvent(const ResultEvent& event) :
Event(event), m_data(event.m_data) {
}
ResultEvent* ResultEvent::clone() {
return new ResultEvent(*this);
}
const std::string& ResultEvent::getData() const { const std::string& ResultEvent::getData() const {
return m_data; return m_data;
} }
...@@ -36,4 +44,4 @@ std::ostream& operator<<(std::ostream& os, const cameo::ResultEvent& status) { ...@@ -36,4 +44,4 @@ std::ostream& operator<<(std::ostream& os, const cameo::ResultEvent& status) {
return os; return os;
} }
} }
\ No newline at end of file
...@@ -29,6 +29,9 @@ class ResultEvent : public Event { ...@@ -29,6 +29,9 @@ class ResultEvent : public Event {
public: public:
ResultEvent(int id, const std::string& name, const std::string& data); ResultEvent(int id, const std::string& name, const std::string& data);
ResultEvent(const ResultEvent& event);
virtual ResultEvent* clone();
const std::string& getData() const; const std::string& getData() const;
...@@ -40,4 +43,4 @@ std::ostream& operator<<(std::ostream&, const ResultEvent&); ...@@ -40,4 +43,4 @@ std::ostream& operator<<(std::ostream&, const ResultEvent&);
} }
#endif #endif
\ No newline at end of file
...@@ -26,6 +26,14 @@ StatusEvent::StatusEvent(int id, const std::string& name, application::State sta ...@@ -26,6 +26,14 @@ StatusEvent::StatusEvent(int id, const std::string& name, application::State sta
m_pastStates(pastStates) { m_pastStates(pastStates) {
} }
StatusEvent::StatusEvent(const StatusEvent& event) :
Event(event), m_state(event.m_state), m_pastStates(event.m_pastStates) {
}
StatusEvent* StatusEvent::clone() {
return new StatusEvent(*this);
}
application::State StatusEvent::getState() const { application::State StatusEvent::getState() const {
return m_state; return m_state;
} }
......
...@@ -29,6 +29,9 @@ class StatusEvent : public Event { ...@@ -29,6 +29,9 @@ class StatusEvent : public Event {
public: public:
StatusEvent(int id, const std::string& name, application::State state, application::State pastStates); StatusEvent(int id, const std::string& name, application::State state, application::State pastStates);
StatusEvent(const StatusEvent& event);
virtual StatusEvent* clone();
application::State getState() const; application::State getState() const;
application::State getPastStates() const; application::State getPastStates() const;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "ApplicationImpl.h"
#include <memory>
#include <iostream>
#include "../EventStreamSocket.h"
#include "../Application.h"
using namespace std;
namespace cameo {
ApplicationImpl::ApplicationImpl() :
ServicesImpl() {
}
ApplicationImpl::~ApplicationImpl() {
// Cancel the event socket in case it was started with a stop handler.
if (m_eventSocket.get() != nullptr) {
m_eventSocket->cancel();
}
}
void ApplicationImpl::setEventSocket(std::unique_ptr<EventStreamSocket>& eventSocket) {
m_eventSocket = std::move(eventSocket);
}
void ApplicationImpl::handleStop(application::This * application, HandlerImpl::FunctionType stop) {
m_stopHandler = unique_ptr<HandlerImpl>(new HandlerImpl(bind(&ApplicationImpl::stoppingFunction, application, stop)));
}
void ApplicationImpl::stoppingFunction(application::This * application, HandlerImpl::FunctionType stop) {
application::State state = application->waitForStop();
// Only stop in case of STOPPING.
if (state == application::STOPPING) {
stop();
}
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_APPLICATIONIMPL_H_
#define CAMEO_APPLICATIONIMPL_H_
#include "HandlerImpl.h"
#include "ServicesImpl.h"
namespace cameo {
class EventStreamSocket;
namespace application {
class This;
}
class ApplicationImpl : public ServicesImpl {
public:
ApplicationImpl();
virtual ~ApplicationImpl();
void setEventSocket(std::unique_ptr<EventStreamSocket>& eventSocket);
void handleStop(application::This * application, HandlerImpl::FunctionType stop);
static void stoppingFunction(application::This * application, HandlerImpl::FunctionType stop);
std::unique_ptr<EventStreamSocket> m_eventSocket;
std::unique_ptr<HandlerImpl> m_stopHandler;
};
}
#endif
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "PublisherImpl.h" #include "PublisherImpl.h"
#include "../Application.h" #include "../Application.h"
#include "../Serializer.h" #include "../Serializer.h"
#include "ApplicationImpl.h" #include "ServicesImpl.h"
#include <sstream> #include <sstream>
using namespace std; using namespace std;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "../Application.h" #include "../Application.h"
#include "../Serializer.h" #include "../Serializer.h"
#include "ApplicationImpl.h" #include "ServicesImpl.h"
#include <sstream> #include <sstream>
using namespace std; using namespace std;
......