Commit 75d6ed62 authored by legoc's avatar legoc
Browse files

- removed StateException, setRunning returns a boolean

- added InvalidArgumentException to replace invalid_argument exception
- added SocketException thrown when the Server connect fails
- Added ConnectionChecker to periodically check the server availability
parent e56e4499
......@@ -18,8 +18,9 @@ libcameo_la_SOURCES = \
cameo/PublisherEvent.cpp \
cameo/PortEvent.cpp \
cameo/RemoteException.cpp \
cameo/InvalidArgumentException.cpp \
cameo/ConnectionTimeout.cpp \
cameo/StateException.cpp \
cameo/SocketException.cpp \
cameo/PublisherCreationException.cpp \
cameo/SubscriberCreationException.cpp \
cameo/StarterServerException.cpp \
......@@ -30,7 +31,7 @@ libcameo_la_SOURCES = \
cameo/impl/GenericWaitingImpl.cpp \
cameo/impl/WaitingImplSet.cpp \
cameo/impl/CancelIdGenerator.cpp \
cameo/impl/ConnectionHandlerSet.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/Response.cpp \
cameo/impl/PublisherImpl.cpp \
......@@ -77,9 +78,11 @@ nobase_include_HEADERS = \
cameo/PortEvent.h \
cameo/RemoteException.h \
cameo/Response.h \
cameo/ConnectionChecker.h \
cameo/Server.h \
cameo/Services.h \
cameo/StateException.h \
cameo/InvalidArgumentException.h \
cameo/SocketException.h \
cameo/StatusEvent.h \
cameo/ResultEvent.h \
cameo/SubscriberCreationException.h \
......
......@@ -103,14 +103,14 @@ This::This(int argc, char *argv[]) :
Services::setImpl(m_impl);
if (argc == 0) {
throw invalid_argument("missing info argument");
throw InvalidArgumentException("missing info argument");
}
string info(argv[argc - 1]);
vector<string> tokens = split(info);
if (tokens.size() < 4) {
throw invalid_argument(info + " is not a valid argument");
throw InvalidArgumentException(info + " is not a valid argument");
}
m_url = tokens[0] + ":" + tokens[1];
......@@ -224,7 +224,7 @@ void This::init() {
initStatus();
}
void This::setRunning() {
bool This::setRunning() {
string strRequestType = m_instance->m_impl->createRequest(PROTO_SETSTATUS);
string strRequestData = m_instance->m_impl->createSetStatusRequest(m_instance->m_id, RUNNING);
......@@ -235,8 +235,10 @@ void This::setRunning() {
delete reply;
if (requestResponse.value() == -1) {
throw StateException(requestResponse.message());
return false;
}
return true;
}
void This::setBinaryResult(const std::string& data) {
......
......@@ -23,13 +23,14 @@
#include <set>
#include <memory>
#include <stdint.h>
#include "InvalidArgumentException.h"
#include "SocketException.h"
#include "ConnectionTimeout.h"
#include "PublisherCreationException.h"
#include "RequesterCreationException.h"
#include "ResponderCreationException.h"
#include "Response.h"
#include "Services.h"
#include "StateException.h"
namespace cameo {
......@@ -119,10 +120,7 @@ public:
static void cancelWaitings();
/**
* throws StateException.
*/
static void setRunning();
static bool setRunning();
/**
* Sets the result.
......
......@@ -14,51 +14,21 @@
* limitations under the Licence.
*/
#include "ConnectionHandlerSet.h"
#include "../Server.h"
#include "ConnectionChecker.h"
#include "Server.h"
using namespace std;
namespace cameo {
ConnectionHandlerSet::ConnectionHandlerSet(Server * server) : m_server(server) {
ConnectionChecker::ConnectionChecker(Server * server, ConnectionChecker::FunctionType handler) : m_server(server), m_function(handler) {
}
ConnectionHandlerSet::~ConnectionHandlerSet() {
ConnectionChecker::~ConnectionChecker() {
stopThread();
}
void ConnectionHandlerSet::add(std::string const & name, ConnectionHandlerSet::FunctionType handler) {
boost::mutex::scoped_lock lock(m_mutex);
m_set[name] = handler;
}
bool ConnectionHandlerSet::remove(std::string const & name) {
boost::mutex::scoped_lock lock(m_mutex);
map<string, FunctionType>::iterator h = m_set.find(name);
if (h != m_set.end()) {
m_set.erase(h);
return true;
}
return false;
}
void ConnectionHandlerSet::apply(bool available) {
boost::mutex::scoped_lock lock(m_mutex);
for (map<string, FunctionType>::const_iterator h = m_set.begin(); h != m_set.end(); ++h) {
h->second(available);
}
}
void ConnectionHandlerSet::loop(int timeoutMs, int pollingTimeMs) {
void ConnectionChecker::loop(int timeoutMs, int pollingTimeMs) {
// Loop until the condition is notified.
while (true) {
......@@ -70,21 +40,21 @@ void ConnectionHandlerSet::loop(int timeoutMs, int pollingTimeMs) {
// Check the server.
bool available = (m_server->isAvailable(timeoutMs));
// Apply the handlers.
apply(available);
// Apply the handler.
m_function(available);
}
}
void ConnectionHandlerSet::startThread(int timeoutMs, int pollingTimeMs) {
void ConnectionChecker::startThread(int timeoutMs, int pollingTimeMs) {
// Stop the thread if it exists.
stopThread();
// Start the thread.
m_thread = auto_ptr<boost::thread>(new boost::thread(boost::bind(&ConnectionHandlerSet::loop, this, timeoutMs, pollingTimeMs)));
m_thread = auto_ptr<boost::thread>(new boost::thread(boost::bind(&ConnectionChecker::loop, this, timeoutMs, pollingTimeMs)));
}
void ConnectionHandlerSet::stopThread() {
void ConnectionChecker::stopThread() {
if (m_thread.get() != 0) {
m_waitCondition.notify();
......
......@@ -14,10 +14,10 @@
* limitations under the Licence.
*/
#ifndef CAMEO_CONNECTIONHANDLERSET_H_
#define CAMEO_CONNECTIONHANDLERSET_H_
#ifndef CAMEO_CONNECTIONCHECKER_H_
#define CAMEO_CONNECTIONCHECKER_H_
#include "TimeCondition.h"
#include "impl/TimeCondition.h"
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
......@@ -30,31 +30,27 @@ namespace cameo {
class Server;
/**
* Class containing a set of connection handler objects.
* It is protected with a mutex because the class must be thread-safe.
* Class providing a simple connection checker.
*/
class ConnectionHandlerSet {
class ConnectionChecker {
friend class Server;
public:
typedef boost::function<void (bool)> FunctionType;
ConnectionHandlerSet(Server * server);
~ConnectionHandlerSet();
void add(std::string const & name, FunctionType handler);
bool remove(std::string const & name);
ConnectionChecker(Server * server, FunctionType handler);
~ConnectionChecker();
private:
void startThread(int timeoutMs, int pollingTimeMs);
void stopThread();
private:
void apply(bool available);
void loop(int timeoutMs, int pollingTimeMs);
Server * m_server;
TimeCondition m_waitCondition;
boost::mutex m_mutex;
std::map<std::string, FunctionType> m_set;
FunctionType m_function;
std::auto_ptr<boost::thread> m_thread;
};
......
......@@ -19,9 +19,9 @@
#include <iostream>
#include <sstream>
#include "Application.h"
#include "ConnectionChecker.h"
#include "impl/ServicesImpl.h"
#include "impl/SocketImpl.h"
#include "impl/ConnectionHandlerSet.h"
#include "ProtoType.h"
using namespace std;
......@@ -29,7 +29,7 @@ using namespace std;
namespace cameo {
Server::Server(const std::string& endpoint) :
Services(), m_connectionPollingTimeMs(10000) {
Services() {
m_impl = new ServicesImpl();
Services::setImpl(m_impl);
......@@ -37,7 +37,7 @@ Server::Server(const std::string& endpoint) :
vector<string> tokens = split(endpoint);
if (tokens.size() < 3) {
throw invalid_argument(endpoint + " is not a valid endpoint");
throw InvalidArgumentException(endpoint + " is not a valid endpoint");
}
m_url = tokens[0] + ":" + tokens[1];
......@@ -45,41 +45,19 @@ Server::Server(const std::string& endpoint) :
istringstream is(port);
is >> m_port;
m_serverEndpoint = m_url + ":" + port;
// Create the connection handler set.
m_connectionHandlerSet = auto_ptr<ConnectionHandlerSet>(new ConnectionHandlerSet(this));
}
Server::~Server() {
}
void Server::setTimeout(int timeoutMs, int connectionPollingTimeMs) {
Services::setTimeout(timeoutMs);
m_connectionPollingTimeMs = connectionPollingTimeMs;
// Start the connection thread if timeout is positive.
if (timeoutMs > 0) {
m_connectionHandlerSet->startThread(timeoutMs, m_connectionPollingTimeMs);
}
else {
m_connectionHandlerSet->stopThread();
}
}
void Server::setTimeout(int timeoutMs) {
setTimeout(timeoutMs, m_connectionPollingTimeMs);
Services::setTimeout(timeoutMs);
}
int Server::getTimeout() const {
return Services::getTimeout();
}
int Server::getConnectionPollingTime() const {
return m_connectionPollingTimeMs;
}
const std::string& Server::getEndpoint() const {
return Services::getEndpoint();
}
......@@ -97,13 +75,17 @@ bool Server::isAvailable(int timeout) const {
}
bool Server::isAvailable() const {
return isAvailable(getAvailableTimeout());
}
int Server::getAvailableTimeout() const {
int timeout = getTimeout();
if (timeout > 0) {
return isAvailable(timeout);
return timeout;
}
else {
return 10000;
}
// Default timeout value is 10000ms.
return isAvailable(10000);
}
std::auto_ptr<application::Instance> Server::makeInstance() {
......@@ -375,12 +357,12 @@ std::auto_ptr<application::Subscriber> Server::createSubscriber(int id, const st
return subscriber;
}
void Server::addConnectionHandler(std::string const & name, ConnectionHandlerType handler) {
m_connectionHandlerSet->add(name, handler);
}
std::auto_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
auto_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
bool Server::removeConnectionHandler(std::string const & name) {
return m_connectionHandlerSet->remove(name);
return connectionChecker;
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
......
......@@ -20,6 +20,7 @@
#include <vector>
#include <memory>
#include "Application.h"
#include "ConnectionChecker.h"
#include "ConnectionTimeout.h"
#include "Response.h"
#include "Services.h"
......@@ -27,8 +28,6 @@
namespace cameo {
class ConnectionHandlerSet;
namespace application {
class This;
}
......@@ -43,16 +42,14 @@ class Server : private Services {
friend std::ostream& operator<<(std::ostream&, const Server&);
public:
typedef boost::function<void (bool)> ConnectionHandlerType;
typedef boost::function<void (bool)> ConnectionCheckerType;
Server(const std::string& endpoint);
~Server();
void setTimeout(int timeoutMs, int connectionPollingTimeMs);
void setTimeout(int timeoutMs);
int getTimeout() const;
int getConnectionPollingTime() const;
const std::string& getEndpoint() const;
const std::string& getUrl() const;
int getPort() const;
......@@ -94,14 +91,9 @@ public:
std::auto_ptr<EventStreamSocket> openEventStream();
/**
* Adds a connection handler.
*/
void addConnectionHandler(std::string const & name, ConnectionHandlerType handler);
/**
* Removes a connection handler. Returns true if the handler is found and removed.
* Creates a connection handler with polling time.
*/
bool removeConnectionHandler(std::string const & name);
std::auto_ptr<ConnectionChecker> createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs = 10000);
private:
std::auto_ptr<application::Instance> makeInstance();
......@@ -109,10 +101,9 @@ private:
Response stopApplicationAsynchronously(int id, bool immediately) const;
std::auto_ptr<application::Instance> stop(int id, bool immediately);
std::auto_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const;
ServicesImpl * m_impl;
int m_connectionPollingTimeMs;
std::auto_ptr<ConnectionHandlerSet> m_connectionHandlerSet;
};
std::ostream& operator<<(std::ostream&, const Server&);
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "StateException.h"
namespace cameo {
StateException::StateException(const std::string& message) :
RemoteException(message) {
}
}
\ No newline at end of file
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_STATEEXCEPTION_H_
#define CAMEO_STATEEXCEPTION_H_
#include "RemoteException.h"
namespace cameo {
class StateException : public RemoteException {
public:
StateException(const std::string& message);
};
}
#endif
\ No newline at end of file
......@@ -18,6 +18,7 @@
#include <iostream>
#include <sstream>
#include "../SocketException.h"
#include "../ConnectionTimeout.h"
using namespace std;
......@@ -84,6 +85,12 @@ std::string ServicesImpl::createStartRequest(const std::string& name, const std:
zmq::message_t* ServicesImpl::tryRequest(const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket.setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket.connect(endpoint.c_str());
int requestDataSize = strRequestData.length();
......@@ -121,7 +128,17 @@ zmq::message_t* ServicesImpl::tryRequest(const std::string& strRequestData, cons
zmq::message_t* ServicesImpl::tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
socket.connect(endpoint.c_str());
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket.setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket.connect(endpoint.c_str());
}
catch (exception const & e) {
throw SocketException(e.what());
}
int requestTypeSize = strRequestType.length();
int requestDataSize = strRequestData.length();
......@@ -230,14 +247,6 @@ zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint)
return publisher;
}
zmq::socket_t * ServicesImpl::createRequestSocket(const std::string& endpoint) {
zmq::socket_t * socket = new zmq::socket_t(m_context, ZMQ_REQ);
socket->connect(endpoint.c_str());
return socket;
}
std::string ServicesImpl::createShowStreamRequest(int id) const {
proto::ShowStreamCommand showStreamCommand;
showStreamCommand.set_id(id);
......
......@@ -61,7 +61,6 @@ public:
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::socket_t * createRequestSocket(const std::string& endpoint);
zmq::message_t * tryRequest(const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
zmq::message_t * tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
......
......@@ -2,6 +2,7 @@ INCLUDES = -I$(top_srcdir)/src
bin_PROGRAMS = \
testcameo \
testbadendpoint \
teststate \
testpublisher \
testsubscriber \
......@@ -14,7 +15,8 @@ bin_PROGRAMS = \
testresponder \
testrequesterandresponder \
testcancel \
teststarttimeout
testtimeout \
testconnectionchecker
testcameo_SOURCES = \
......@@ -25,6 +27,14 @@ testcameo_LDFLAGS = $(CAMEO_LDFLAGS)
testcameo_LDADD = ../libcameo.la $(CAMEO_LIBS)
testbadendpoint_SOURCES = \
TestBadEndpoint.cpp
testbadendpoint_CPPFLAGS = $(CAMEO_CXXFLAGS)
testbadendpoint_LDFLAGS = $(CAMEO_LDFLAGS)
testbadendpoint_LDADD = ../libcameo.la $(CAMEO_LIBS)
teststate_SOURCES = \
TestStateApplication.cpp
......@@ -121,9 +131,17 @@ testcancel_LDFLAGS = $(CAMEO_LDFLAGS)
testcancel_LDADD = ../libcameo.la $(CAMEO_LIBS)
teststarttimeout_SOURCES = \
TestStartTimeoutApplication.cpp
testtimeout_SOURCES = \
TestTimeoutApplication.cpp
testtimeout_CPPFLAGS = $(CAMEO_CXXFLAGS)
testtimeout_LDFLAGS = $(CAMEO_LDFLAGS)
testtimeout_LDADD = ../libcameo.la $(CAMEO_LIBS)
testconnectionchecker_SOURCES = \
TestConnectionCheckerApplication.cpp
teststarttimeout_CPPFLAGS = $(CAMEO_CXXFLAGS)
teststarttimeout_LDFLAGS = $(CAMEO_LDFLAGS)
teststarttimeout_LDADD = ../libcameo.la $(CAMEO_LIBS)
testconnectionchecker_CPPFLAGS = $(CAMEO_CXXFLAGS)
testconnectionchecker_LDFLAGS = $(CAMEO_LDFLAGS)
testconnectionchecker_LDADD = ../libcameo.la $(CAMEO_LIBS)
......@@ -19,54 +19,24 @@
#include <vector>
#include <sstream>
#include "../cameo/cameo.h"
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace cameo;
struct ConnectionHandler {
void operator()(bool available) {
if (!available) {
application::This::cancelWaitings();
}
}
};
int main(int argc, char *argv[]) {
application::This::init(argc, argv);
// The start function must be called into a block to ensure the destructor of Instance is called before This::terminate()
{
Server server("tcp://localhost:8000");
server.addConnectionHandler("timeout", ConnectionHandler());
server.setTimeout(100, 100);
sleep(1);
//server.setTimeout(0);
Server server("tcp://localhost:9000");
cout << "reset timeout" << endl;
/*
if (server.isAvailable()) {
cout << "connected" << endl;
auto_ptr<application::Instance> resultApplication = server.start("test");
cout << "finished the application" << endl;
while (server.isAvailable(1000)) {