Commit ab7ad18d authored by legoc's avatar legoc
Browse files

implemented connection handler

parent 801d229a
0.1.0
-----
* Corrected Server::isAvailable implementation when timeout is set.
* Implemented connection handler.
\ No newline at end of file
......@@ -4,8 +4,8 @@
#
# -----------------------------------------------------------------------------
AC_INIT(cameo-api-cpp, 0.0.1)
LIBRARY_VERSION=0:0:1
AC_INIT(cameo-api-cpp, 0.1.0)
LIBRARY_VERSION=0:1:0
AC_CONFIG_AUX_DIR(config)
AC_CONFIG_SRCDIR(src/cameo/Application.cpp)
......@@ -36,6 +36,7 @@ AC_SUBST(LIBRARY_VERSION)
#AX_CXX_COMPILE_STDCXX_11
AX_PTHREAD(,[AC_MSG_ERROR([Posix threads required])])
AX_BOOST_BASE([1.41],, [AC_MSG_ERROR([Boost 1.41 required])])
AX_BOOST_DATE_TIME
AX_BOOST_THREAD
AX_BOOST_SYSTEM
AC_ZMQ
......@@ -49,7 +50,8 @@ CAMEO_LDFLAGS="$BOOST_LDFLAGS \
$ZMQ_LDFLAGS \
$PROTOBUF_LDFLAGS"
CAMEO_LIBS="$BOOST_THREAD_LIB \
CAMEO_LIBS="$BOOST_DATE_TIME_LIB \
$BOOST_THREAD_LIB \
$BOOST_SYSTEM_LIB \
$ZMQ_LIB \
$PROTOBUF_LIB"
......
......@@ -11,6 +11,7 @@ lib_LTLIBRARIES = libcameo.la
# header files that must be in the dist package but not installed are sources
libcameo_la_SOURCES = \
cameo/impl/Serializer.cpp \
cameo/impl/TimeCondition.cpp \
cameo/Event.cpp \
cameo/StatusEvent.cpp \
cameo/ResultEvent.cpp \
......@@ -29,6 +30,7 @@ libcameo_la_SOURCES = \
cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \
cameo/impl/CancelIdGenerator.cpp \
cameo/impl/ConnectionHandlerSet.cpp \
cameo/EventStreamSocket.cpp \
cameo/Response.cpp \
cameo/impl/PublisherImpl.cpp \
......
......@@ -22,6 +22,7 @@
#include "Application.h"
#include "impl/ServicesImpl.h"
#include "impl/SocketImpl.h"
#include "impl/ConnectionHandlerSet.h"
#include "ProtoType.h"
using namespace std;
......@@ -29,7 +30,7 @@ using namespace std;
namespace cameo {
Server::Server(const std::string& endpoint) :
Services() {
Services(), m_connectionPollingTimeMs(10000) {
m_impl = new ServicesImpl();
Services::setImpl(m_impl);
......@@ -45,19 +46,41 @@ Server::Server(const std::string& endpoint) :
istringstream is(port);
is >> m_port;
m_serverEndpoint = m_url + ":" + port;
// Create the connection handler set.
m_connectionHandlerSet = auto_ptr<ConnectionHandlerSet>(new ConnectionHandlerSet(this));
}
Server::~Server() {
}
void Server::setTimeout(int timeout) {
Services::setTimeout(timeout);
void Server::setTimeout(int timeoutMs, int connectionPollingTimeMs) {
Services::setTimeout(timeoutMs);
m_connectionPollingTimeMs = connectionPollingTimeMs;
// Start the connection thread if timeout is positive.
if (timeoutMs > 0) {
m_connectionHandlerSet->startThread(timeoutMs, m_connectionPollingTimeMs);
}
else {
m_connectionHandlerSet->stopThread();
}
}
void Server::setTimeout(int timeoutMs) {
setTimeout(timeoutMs, m_connectionPollingTimeMs);
}
int Server::getTimeout() const {
return Services::getTimeout();
}
int Server::getConnectionPollingTime() const {
return m_connectionPollingTimeMs;
}
const std::string& Server::getEndpoint() const {
return Services::getEndpoint();
}
......@@ -74,6 +97,16 @@ bool Server::isAvailable(int timeout) const {
return Services::isAvailable(timeout);
}
bool Server::isAvailable() const {
int timeout = getTimeout();
if (timeout > 0) {
return isAvailable(timeout);
}
// Default timeout value is 10000ms.
return isAvailable(10000);
}
std::auto_ptr<application::Instance> Server::makeInstance() {
auto_ptr<EventStreamSocket> socket = Services::openEventStream();
return auto_ptr<application::Instance>(new application::Instance(this, socket));
......@@ -343,6 +376,14 @@ std::auto_ptr<application::Subscriber> Server::createSubscriber(int id, const st
return subscriber;
}
void Server::addConnectionHandler(std::string const & name, ConnectionHandlerType handler) {
m_connectionHandlerSet->add(name, handler);
}
bool Server::removeConnectionHandler(std::string const & name) {
m_connectionHandlerSet->remove(name);
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
os << "server@" << server.m_serverEndpoint;
......
......@@ -27,6 +27,8 @@
namespace cameo {
class ConnectionHandlerSet;
namespace application {
class This;
}
......@@ -41,15 +43,25 @@ class Server : private Services {
friend std::ostream& operator<<(std::ostream&, const Server&);
public:
typedef boost::function<void (bool)> ConnectionHandlerType;
Server(const std::string& endpoint);
~Server();
void setTimeout(int timeout);
void setTimeout(int timeoutMs, int connectionPollingTimeMs);
void setTimeout(int timeoutMs);
int getTimeout() const;
int getConnectionPollingTime() const;
const std::string& getEndpoint() const;
const std::string& getUrl() const;
int getPort() const;
bool isAvailable(int timeout = 10000) const;
bool isAvailable(int timeoutMs) const;
/**
* Returns true if is available. Uses the timeout if set or 10000ms.
*/
bool isAvailable() const;
std::auto_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, Option options = NONE);
std::auto_ptr<application::Instance> start(const std::string& name, Option options = NONE);
......@@ -81,13 +93,26 @@ public:
*/
std::auto_ptr<EventStreamSocket> openEventStream();
/**
* Adds a connection handler.
*/
void addConnectionHandler(std::string const & name, ConnectionHandlerType handler);
/**
* Removes a connection handler. Returns true if the handler is found and removed.
*/
bool removeConnectionHandler(std::string const & name);
private:
ServicesImpl * m_impl;
std::auto_ptr<application::Instance> makeInstance();
bool isAlive(int id) const;
Response stopApplicationAsynchronously(int id, bool immediately) const;
std::auto_ptr<application::Instance> stop(int id, bool immediately);
std::auto_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
ServicesImpl * m_impl;
int m_connectionPollingTimeMs;
std::auto_ptr<ConnectionHandlerSet> m_connectionHandlerSet;
};
std::ostream& operator<<(std::ostream&, const Server&);
......
......@@ -17,7 +17,11 @@
#ifndef CAMEO_H_
#define CAMEO_H_
#define CAMEO_API_VERSION_MAJOR 0
#define CAMEO_API_VERSION_MINOR 1
#define CAMEO_API_VERSION_REVISION 0
#include "Application.h"
#include "Server.h"
#endif
\ No newline at end of file
#endif
......@@ -13,7 +13,8 @@ bin_PROGRAMS = \
testpublisherloop \
testresponder \
testrequesterandresponder \
testcancel
testcancel \
teststarttimeout
testcameo_SOURCES = \
......@@ -119,3 +120,10 @@ testcancel_CPPFLAGS = $(CAMEO_CXXFLAGS)
testcancel_LDFLAGS = $(CAMEO_LDFLAGS)
testcancel_LDADD = ../libcameo.la $(CAMEO_LIBS)
teststarttimeout_SOURCES = \
TestStartTimeoutApplication.cpp
teststarttimeout_CPPFLAGS = $(CAMEO_CXXFLAGS)
teststarttimeout_LDFLAGS = $(CAMEO_LDFLAGS)
teststarttimeout_LDADD = ../libcameo.la $(CAMEO_LIBS)
......@@ -20,19 +20,27 @@
#include <vector>
#include <sstream>
#include "../cameo/cameo.h"
#include <boost/shared_ptr.hpp>
using namespace std;
using namespace boost;
using namespace cameo;
bool stopping = false;
struct StopData {
bool stopping;
StopData() : stopping(false) {}
};
struct Stop {
void operator()() {
stopping = true;
}
static void handle() {
stopping = true;
shared_ptr<StopData> data;
Stop(shared_ptr<StopData> sharedData) : data(sharedData) {}
void operator()() {
cout << "stop requested" << endl;
data->stopping = true;
}
};
......@@ -48,11 +56,12 @@ int main(int argc, char *argv[]) {
application::This::setRunning();
//application::This::handleStop(Stop()); // Works too
application::This::handleStop(&Stop::handle);
// Define an object StopData that is shared with the handler.
shared_ptr<StopData> data(new StopData());
application::This::handleStop(Stop(data));
int i = 0;
while (!stopping) {
while (!data->stopping) {
cout << "waiting " << i << "..." << endl;
usleep(100000);
i++;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment