Commit 84cdc19b authored by legoc's avatar legoc
Browse files

Starting with the implementation of the event thread in C++

parent f028f1f6
......@@ -4,7 +4,7 @@
#
# -----------------------------------------------------------------------------
AC_INIT(cameo-api-cpp, 0.2.1)
AC_INIT(cameo-api-cpp, 0.2.2)
LIBRARY_VERSION=0:2:1
AC_CONFIG_AUX_DIR(config)
......
......@@ -13,6 +13,8 @@ libcameo_la_SOURCES = \
cameo/ResultEvent.cpp \
cameo/PublisherEvent.cpp \
cameo/PortEvent.cpp \
cameo/CancelEvent.cpp \
cameo/EventListener.cpp \
cameo/RemoteException.cpp \
cameo/InvalidArgumentException.cpp \
cameo/UnmanagedApplicationException.cpp \
......@@ -37,6 +39,7 @@ libcameo_la_SOURCES = \
cameo/impl/RequestImpl.cpp \
cameo/impl/ResponderImpl.cpp \
cameo/impl/RequesterImpl.cpp \
cameo/EventThread.cpp \
cameo/Services.cpp \
cameo/impl/ServicesImpl.cpp \
cameo/Server.cpp \
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "CancelEvent.h"
#include <iostream>
namespace cameo {
CancelEvent::CancelEvent(int id, const std::string& name) :
Event(id, name) {
}
std::ostream& operator<<(std::ostream& os, const cameo::CancelEvent& event) {
os << "name=" << event.m_name
<< "\nid=" << event.m_id;
return os;
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_CANCELEVENT_H_
#define CAMEO_CANCELEVENT_H_
#include <iostream>
#include "Event.h"
namespace cameo {
class CancelEvent : public Event {
friend std::ostream& operator<<(std::ostream&, const CancelEvent&);
public:
CancelEvent(int id, const std::string& name);
};
std::ostream& operator<<(std::ostream&, const CancelEvent&);
}
#endif
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_CONCURRENTQUEUE_H_
#define CAMEO_CONCURRENTQUEUE_H_
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
namespace cameo {
/**
* Class implementing a concurrent queue. This is a modified version of the implementation:
* https://juanchopanzacpp.wordpress.com/2013/02/26/concurrent-queue-c11/
* Supports only pointer types.
*/
template<typename Type>
class ConcurrentQueue {
public:
/**
* Destructor. Deletes all the items.
*/
~ConcurrentQueue() {
std::unique_lock<std::mutex> lock(m_mutex);
while (!m_queue.empty()) {
delete m_queue.front();
m_queue.pop();
}
}
/**
* Gets the front item if the queue is not empty. Returns a null pointer otherwise.
*/
std::unique_ptr<Type> poll() {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return std::unique_ptr<Type>();
}
auto item = m_queue.front();
m_queue.pop();
return std::unique_ptr<Type>(item);
}
/**
* Gets the front item. Blocking call until there is an item.
*/
std::unique_ptr<Type> pop() {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.empty()) {
m_condition.wait(lock);
}
auto item = m_queue.front();
m_queue.pop();
return std::unique_ptr<Type>(item);
}
/**
* Pushes an item.
*/
void push(std::unique_ptr<Type> & item) {
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(item.release());
lock.unlock();
m_condition.notify_one();
}
/**
* Returns the size of the queue.
*/
typename std::queue<Type *>::size_type size() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
private:
std::queue<Type *> m_queue;
std::mutex m_mutex;
std::condition_variable m_condition;
};
}
#endif
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "EventListener.h"
#include "CancelEvent.h"
using namespace std;
namespace cameo {
EventListener::EventListener() {
}
EventListener::~EventListener() {
}
void EventListener::setName(const std::string& name) {
m_name = name;
}
const std::string& EventListener::getName() const {
return m_name;
}
void EventListener::pushEvent(std::unique_ptr<Event>& event) {
m_eventQueue.push(event);
}
std::unique_ptr<Event> EventListener::popEvent(bool blocking) {
if (blocking) {
return m_eventQueue.pop();
}
return m_eventQueue.poll();
}
std::unique_ptr<Event> EventListener::popEvent() {
return popEvent(true);
}
void EventListener::cancel(int id) {
unique_ptr<Event> event(new CancelEvent(id, m_name));
m_eventQueue.push(event);
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_EVENTLISTENER_H_
#define CAMEO_EVENTLISTENER_H_
#include "ConcurrentQueue.h"
#include <string>
namespace cameo {
class Event;
class EventListener {
public:
EventListener();
virtual ~EventListener();
void setName(const std::string& name);
const std::string& getName() const;
void pushEvent(std::unique_ptr<Event>& event);
std::unique_ptr<Event> popEvent(bool blocking);
std::unique_ptr<Event> popEvent();
void cancel(int id);
protected:
std::string m_name;
ConcurrentQueue<Event> m_eventQueue;
};
}
#endif
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "EventThread.h"
#include "Server.h"
#include "EventStreamSocket.h"
#include "StatusEvent.h"
#include "ResultEvent.h"
#include "PublisherEvent.h"
#include "PortEvent.h"
namespace cameo {
EventThread::EventThread(Server * server, std::unique_ptr<EventStreamSocket>& socket) :
m_server(server) {
m_socket = std::move(socket);
}
EventThread::~EventThread() {
}
void EventThread::start() {
}
void EventThread::cancel() {
m_socket->cancel();
}
void EventThread::processStatusEvent(StatusEvent * status) {
}
void EventThread::processResultEvent(ResultEvent * result) {
}
void EventThread::processPublisherEvent(PublisherEvent * publisher) {
}
void EventThread::processPortEvent(PortEvent * port) {
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_EVENTTHREAD_H_
#define CAMEO_EVENTTHREAD_H_
#include <string>
#include <memory>
#include <thread>
namespace cameo {
class Server;
class EventStreamSocket;
class StatusEvent;
class ResultEvent;
class PublisherEvent;
class PortEvent;
class EventThread {
public:
EventThread(Server * server, std::unique_ptr<EventStreamSocket>& socket);
~EventThread();
void start();
void cancel();
private:
void processStatusEvent(StatusEvent * status);
void processResultEvent(ResultEvent * result);
void processPublisherEvent(PublisherEvent * publisher);
void processPortEvent(PortEvent * port);
Server * m_server;
std::unique_ptr<EventStreamSocket> m_socket;
std::unique_ptr<std::thread> m_thread;
};
}
#endif
......@@ -390,6 +390,28 @@ std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionChe
return connectionChecker;
}
std::vector<EventListener *> Server::getEventListeners() {
std::unique_lock<std::mutex> lock(m_eventListenersMutex);
return m_eventListeners;
}
void Server::registerEventListener(EventListener * listener) {
std::unique_lock<std::mutex> lock(m_eventListenersMutex);
m_eventListeners.push_back(listener);
}
void Server::unregisterEventListener(EventListener * listener) {
std::unique_lock<std::mutex> lock(m_eventListenersMutex);
// Iterate to find the listener.
for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
if (*it == listener) {
m_eventListeners.erase(it);
break;
}
}
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
os << "server@" << server.m_serverEndpoint;
......
......@@ -19,6 +19,7 @@
#include <vector>
#include <memory>
#include <mutex>
#include "Application.h"
#include "ConnectionChecker.h"
#include "ConnectionTimeout.h"
......@@ -32,6 +33,8 @@ namespace application {
class This;
}
class EventListener;
class Server : private Services {
friend class SubscriberImpl;
......@@ -95,6 +98,21 @@ public:
*/
std::unique_ptr<ConnectionChecker> createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs = 10000);
/**
* Gets the event listeners. Copies the list.
*/
std::vector<EventListener *> getEventListeners();
/**
* Registers an event listener.
*/
void registerEventListener(EventListener * listener);
/**
* Unregisters an event listener.
*/
void unregisterEventListener(EventListener * listener);
private:
std::unique_ptr<application::Instance> makeInstance();
bool isAlive(int id) const;
......@@ -103,6 +121,9 @@ private:
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
std::mutex m_eventListenersMutex;
std::vector<EventListener *> m_eventListeners;
};
std::ostream& operator<<(std::ostream&, const Server&);
......
0.1.7
-----
* Optimized socket creation by keeping alive all the sockets.
0.1.6
-----
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment