Commit c8f6dbc4 authored by legoc's avatar legoc
Browse files

Implemented state handler

parent 23c8de9a
###############################################################################
# Version 01/07/2015
# Version 11/07/2016
# defines CAMEO_CFLAGS, CAMEO_LDFLAGS, CAMEO_LIBS
#
AC_DEFUN([AC_CAMEO],
......@@ -11,8 +11,10 @@ AC_DEFUN([AC_CAMEO],
AC_LIBZMQ
AC_LIBPROTOBUF_LITE
AX_BOOST_THREAD
AX_BOOST_DATE_TIME
AX_BOOST_THREAD
AX_BOOST_SYSTEM
CAMEO_CFLAGS=
CAMEO_LIBS=
if test $with_cameo != no; then
......
......@@ -11,7 +11,7 @@ lib_LTLIBRARIES = libcameo.la
# header files that must be in the dist package but not installed are sources
libcameo_la_SOURCES = \
cameo/impl/Serializer.cpp \
cameo/impl/TimeCondition.cpp \
cameo/TimeCondition.cpp \
cameo/Event.cpp \
cameo/StatusEvent.cpp \
cameo/ResultEvent.cpp \
......@@ -67,6 +67,7 @@ libcameo_la_SOURCES = \
# header files that are installed
nobase_include_HEADERS = \
cameo/TimeCondition.h \
cameo/Application.h \
cameo/ProtoType.h \
cameo/ConnectionTimeout.h \
......
......@@ -498,7 +498,7 @@ bool Instance::kill() {
return true;
}
State Instance::waitFor(int states, const std::string& eventName) {
State Instance::waitFor(int states, const std::string& eventName, StateHandlerType handler) {
if (!exists()) {
// the application was not launched
......@@ -537,6 +537,11 @@ State Instance::waitFor(int states, const std::string& eventName) {
m_pastStates = status->getPastStates();
m_lastState = state;
// call the state handler.
if (!handler.empty()) {
handler(state);
}
// test the terminal state
if (state == SUCCESS
|| state == STOPPED
......@@ -574,8 +579,12 @@ State Instance::waitFor(int states, const std::string& eventName) {
return m_lastState;
}
State Instance::waitFor(int states) {
return waitFor(states, "");
State Instance::waitFor(int states, StateHandlerType handler) {
return waitFor(states, "", handler);
}
State Instance::waitFor(StateHandlerType handler) {
return waitFor(0, "", handler);
}
void Instance::cancelWaitFor() {
......
......@@ -31,6 +31,7 @@
#include "ResponderCreationException.h"
#include "Response.h"
#include "Services.h"
#include "TimeCondition.h"
namespace cameo {
......@@ -174,6 +175,8 @@ class Instance {
friend std::ostream& operator<<(std::ostream&, const Instance&);
public:
typedef boost::function<void (State)> StateHandlerType;
~Instance();
const std::string& getName() const;
......@@ -187,8 +190,11 @@ public:
State getInitialState() const;
bool stop();
bool kill();
State waitFor(int states = 0);
State waitFor(int states, const std::string& eventName);
State waitFor(StateHandlerType handler = 0);
State waitFor(int states, StateHandlerType handler = 0);
State waitFor(int states, const std::string& eventName, StateHandlerType handler = 0);
void cancelWaitFor();
bool getBinaryResult(std::string& result);
......
......@@ -15,8 +15,8 @@
*/
#include "ConnectionChecker.h"
#include "impl/TimeCondition.h"
#include "Server.h"
#include "TimeCondition.h"
using namespace std;
......
......@@ -82,49 +82,6 @@ std::string ServicesImpl::createStartRequest(const std::string& name, const std:
return strRequestStart;
}
zmq::message_t* ServicesImpl::tryRequest(const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
int value = 0;
socket.setsockopt(ZMQ_LINGER, &value, sizeof(int));
// Connect to the endpoint.
socket.connect(endpoint.c_str());
int requestDataSize = strRequestData.length();
zmq::message_t requestData(requestDataSize);
memcpy((void *) requestData.data(), strRequestData.c_str(), requestDataSize);
socket.send(requestData);
int timeout = m_timeout;
if (overrideTimeout > -1) {
timeout = overrideTimeout;
}
if (timeout > 0) {
// polling
zmq_pollitem_t items[1];
items[0].socket = socket;
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
int rc = zmq::poll(items, 1, timeout);
if (rc == 0) {
// timeout
socket.close();
throw ConnectionTimeout();
}
}
zmq::message_t *reply = new zmq::message_t;
socket.recv(reply, 0);
return reply;
}
zmq::message_t* ServicesImpl::tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout) {
zmq::socket_t socket(m_context, ZMQ_REQ);
......
......@@ -61,7 +61,6 @@ public:
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
zmq::message_t * tryRequest(const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
zmq::message_t * tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
......
......@@ -23,6 +23,12 @@
using namespace std;
using namespace cameo;
struct StateHandler {
void operator()(application::State state) {
cout << "received state " << application::toString(state) << endl;
}
};
int main(int argc, char *argv[]) {
application::This::init(argc, argv);
......@@ -43,6 +49,8 @@ int main(int argc, char *argv[]) {
cout << "stopping application " << stopApplication->getNameId() << endl;
stopApplication->stop();
stopApplication->waitFor(StateHandler());
string result;
if (stopApplication->getResult(result)) {
cout << "stop application returned " << result << endl;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment