Commit 5c4e6acf authored by legoc's avatar legoc
Browse files

Added output stream socket for C++ API

parent 14c51a03
...@@ -30,6 +30,7 @@ libcameo_la_SOURCES = \ ...@@ -30,6 +30,7 @@ libcameo_la_SOURCES = \
cameo/impl/CancelIdGenerator.cpp \ cameo/impl/CancelIdGenerator.cpp \
cameo/ConnectionChecker.cpp \ cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \ cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/Response.cpp \ cameo/Response.cpp \
cameo/impl/PublisherImpl.cpp \ cameo/impl/PublisherImpl.cpp \
cameo/impl/SubscriberImpl.cpp \ cameo/impl/SubscriberImpl.cpp \
...@@ -68,6 +69,7 @@ nobase_include_HEADERS = \ ...@@ -68,6 +69,7 @@ nobase_include_HEADERS = \
cameo/ConnectionTimeout.h \ cameo/ConnectionTimeout.h \
cameo/Event.h \ cameo/Event.h \
cameo/EventStreamSocket.h \ cameo/EventStreamSocket.h \
cameo/OutputStreamSocket.h \
cameo/PublisherCreationException.h \ cameo/PublisherCreationException.h \
cameo/ResponderCreationException.h \ cameo/ResponderCreationException.h \
cameo/RequesterCreationException.h \ cameo/RequesterCreationException.h \
......
...@@ -476,6 +476,10 @@ void Instance::setErrorMessage(const std::string& message) { ...@@ -476,6 +476,10 @@ void Instance::setErrorMessage(const std::string& message) {
m_errorMessage = message; m_errorMessage = message;
} }
void Instance::setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket) {
m_outputStreamSocket = std::move(socket);
}
void Instance::setPastStates(State pastStates) { void Instance::setPastStates(State pastStates) {
m_pastStates = pastStates; m_pastStates = pastStates;
} }
...@@ -700,6 +704,10 @@ bool Instance::getResult(std::vector<double>& result) { ...@@ -700,6 +704,10 @@ bool Instance::getResult(std::vector<double>& result) {
return m_hasResult; return m_hasResult;
} }
std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() {
return m_outputStreamSocket;
}
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
// InstanceArray // InstanceArray
......
...@@ -36,11 +36,13 @@ ...@@ -36,11 +36,13 @@
namespace cameo { namespace cameo {
enum Option { enum Option {
NONE = 0 NONE = 0,
OUTPUTSTREAM = 1
}; };
class Server; class Server;
class EventStreamSocket; class EventStreamSocket;
class OutputStreamSocket;
class ApplicationImpl; class ApplicationImpl;
class PublisherImpl; class PublisherImpl;
class SubscriberImpl; class SubscriberImpl;
...@@ -216,18 +218,22 @@ public: ...@@ -216,18 +218,22 @@ public:
bool getResult(std::vector<float>& result); bool getResult(std::vector<float>& result);
bool getResult(std::vector<double>& result); bool getResult(std::vector<double>& result);
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
private: private:
Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket); Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket);
void setId(int id); void setId(int id);
void setName(const std::string& name); void setName(const std::string& name);
void setErrorMessage(const std::string& message); void setErrorMessage(const std::string& message);
void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
void setPastStates(State pastStates); void setPastStates(State pastStates);
void setInitialState(State state); void setInitialState(State state);
State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking); State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking);
const Server * m_server; const Server * m_server;
std::unique_ptr<EventStreamSocket> m_eventSocket; std::unique_ptr<EventStreamSocket> m_eventSocket;
std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
std::unique_ptr<WaitingImpl> m_waiting; std::unique_ptr<WaitingImpl> m_waiting;
std::string m_name; std::string m_name;
int m_id; int m_id;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "OutputStreamSocket.h"
#include "impl/SocketImpl.h"
#include "impl/SocketWaitingImpl.h"
#include "../proto/Messages.pb.h"
using namespace std;
namespace cameo {
Output::Output(int id, const std::string& message, bool end) {
m_id = id;
m_message = message;
m_end = end;
}
int Output::getId() const {
return m_id;
}
const std::string& Output::getMessage() const {
return m_message;
}
bool Output::isEnd() const {
return m_end;
}
OutputStreamSocket::OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl) :
m_streamString(streamString),
m_endOfStreamString(endOfStreamString),
m_impl(impl) {
}
OutputStreamSocket::~OutputStreamSocket() {
}
std::unique_ptr<Output> OutputStreamSocket::receive() {
unique_ptr<zmq::message_t> message(m_impl->receive());
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
return unique_ptr<Output>(nullptr);
}
string response(static_cast<char*>(message->data()), message->size());
bool end = false;
if (response == m_streamString) {
end = false;
}
else if (response == m_endOfStreamString) {
end = true;
}
message = m_impl->receive();
proto::ApplicationStream protoStream;
protoStream.ParseFromArray(message->data(), message->size());
return unique_ptr<Output>(new Output(protoStream.id(), protoStream.message(), end));
}
void OutputStreamSocket::cancel() {
m_impl->cancel();
}
WaitingImpl * OutputStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_OUTPUTSTREAMSOCKET_H_
#define CAMEO_OUTPUTSTREAMSOCKET_H_
#include <memory>
#include "Event.h"
namespace cameo {
class SocketImpl;
class WaitingImpl;
namespace application {
class Instance;
}
class Output {
public:
Output(int id, const std::string& message, bool end);
int getId() const;
const std::string& getMessage() const;
bool isEnd() const;
private:
int m_id;
std::string m_message;
bool m_end;
};
class OutputStreamSocket {
friend class Services;
friend class application::Instance;
public:
~OutputStreamSocket();
std::unique_ptr<Output> receive();
void cancel();
private:
OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl);
WaitingImpl * waiting();
std::string m_streamString;
std::string m_endOfStreamString;
std::unique_ptr<SocketImpl> m_impl;
};
}
#endif
...@@ -48,7 +48,8 @@ enum ProtoType { ...@@ -48,7 +48,8 @@ enum ProtoType {
PROTO_CANCEL, PROTO_CANCEL,
PROTO_SETRESULT, PROTO_SETRESULT,
PROTO_STARTEDUNMANAGED, PROTO_STARTEDUNMANAGED,
PROTO_TERMINATEDUNMANAGED PROTO_TERMINATEDUNMANAGED,
PROTO_OUTPUT
}; };
} }
......
...@@ -96,15 +96,38 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op ...@@ -96,15 +96,38 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op
return start(name, vector<string>(), options); return start(name, vector<string>(), options);
} }
int Server::getStreamPort(const std::string& name) {
string strRequestType = m_impl->createRequest(PROTO_OUTPUT);
string strRequestData = m_impl->createOutputRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
return requestResponse.value();
}
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) { std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
unique_ptr<application::Instance> instance = makeInstance(); unique_ptr<application::Instance> instance = makeInstance();
instance->setName(name); instance->setName(name);
string strRequestType = m_impl->createRequest(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
try { try {
if (outputStream) {
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
string strRequestType = m_impl->createRequest(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint); unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse; proto::RequestResponse requestResponse;
...@@ -167,7 +190,9 @@ std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) { ...@@ -167,7 +190,9 @@ std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) {
return instance; return instance;
} }
application::InstanceArray Server::connectAll(const std::string& name) { application::InstanceArray Server::connectAll(const std::string& name, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
application::InstanceArray instances; application::InstanceArray instances;
...@@ -195,6 +220,13 @@ application::InstanceArray Server::connectAll(const std::string& name) { ...@@ -195,6 +220,13 @@ application::InstanceArray Server::connectAll(const std::string& name) {
if (isAlive(applicationId)) { if (isAlive(applicationId)) {
aliveInstancesCount++; aliveInstancesCount++;
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
if (outputStream) {
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(info.name()));
instance->setOutputStreamSocket(socket);
}
instance->setId(applicationId); instance->setId(applicationId);
instance->setInitialState(info.applicationstate()); instance->setInitialState(info.applicationstate());
instance->setPastStates(info.pastapplicationstates()); instance->setPastStates(info.pastapplicationstates());
...@@ -219,9 +251,9 @@ application::InstanceArray Server::connectAll(const std::string& name) { ...@@ -219,9 +251,9 @@ application::InstanceArray Server::connectAll(const std::string& name) {
return aliveInstances; return aliveInstances;
} }
std::unique_ptr<application::Instance> Server::connect(const std::string& name) { std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
application::InstanceArray instances = connectAll(name); application::InstanceArray instances = connectAll(name, options);
if (instances.size() == 0) { if (instances.size() == 0) {
unique_ptr<application::Instance> instance = makeInstance(); unique_ptr<application::Instance> instance = makeInstance();
......
...@@ -62,8 +62,8 @@ public: ...@@ -62,8 +62,8 @@ public:
std::unique_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, Option options = NONE); std::unique_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, Option options = NONE);
std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE); std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE);
application::InstanceArray connectAll(const std::string& name); application::InstanceArray connectAll(const std::string& name, Option options = NONE);
std::unique_ptr<application::Instance> connect(const std::string& name); std::unique_ptr<application::Instance> connect(const std::string& name, Option options = NONE);
/** /**
* throws ConnectionTimeout * throws ConnectionTimeout
...@@ -102,6 +102,7 @@ private: ...@@ -102,6 +102,7 @@ private:
std::unique_ptr<application::Instance> stop(int id, bool immediately); std::unique_ptr<application::Instance> stop(int id, bool immediately);
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const; std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const; int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
}; };
std::ostream& operator<<(std::ostream&, const Server&); std::ostream& operator<<(std::ostream&, const Server&);
......
...@@ -139,4 +139,17 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() { ...@@ -139,4 +139,17 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
return unique_ptr<EventStreamSocket>(new EventStreamSocket(new SocketImpl(subscriber, cancelPublisher))); return unique_ptr<EventStreamSocket>(new EventStreamSocket(new SocketImpl(subscriber, cancelPublisher)));
} }
std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) {
if (port == -1) {
return nullptr;
}
// Prepare our context and subscriber
string streamEndpoint = m_url + ":" + to_string(port);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(ServicesImpl::STREAM, ServicesImpl::ENDSTREAM, new SocketImpl(subscriber)));
}
} }
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "EventStreamSocket.h" #include "EventStreamSocket.h"
#include "OutputStreamSocket.h"
namespace cameo { namespace cameo {
...@@ -46,6 +47,7 @@ public: ...@@ -46,6 +47,7 @@ public:
bool isAvailable(int timeout) const; bool isAvailable(int timeout) const;
void initStatus(); void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream(); std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port);
std::string m_serverEndpoint; std::string m_serverEndpoint;
std::string m_url; std::string m_url;
......
...@@ -45,4 +45,4 @@ public: ...@@ -45,4 +45,4 @@ public:
} }
#endif #endif
\ No newline at end of file
...@@ -41,6 +41,8 @@ const std::string ServicesImpl::RESULT = "RESULT"; ...@@ -41,6 +41,8 @@ const std::string ServicesImpl::RESULT = "RESULT";
const std::string ServicesImpl::PUBLISHER = "PUBLISHER"; const std::string ServicesImpl::PUBLISHER = "PUBLISHER";
const std::string ServicesImpl::PORT = "PORT"; const std::string ServicesImpl::PORT = "PORT";
const std::string ServicesImpl::CANCEL = "CANCEL"; const std::string ServicesImpl::CANCEL = "CANCEL";
const std::string ServicesImpl::STREAM = "STREAM";
const std::string ServicesImpl::ENDSTREAM = "ENDSTREAM";
ServicesImpl::ServicesImpl() : ServicesImpl::ServicesImpl() :
m_context(1), m_timeout(0) { m_context(1), m_timeout(0) {
...@@ -207,6 +209,24 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint, ...@@ -207,6 +209,24 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint,
return subscriber; return subscriber;
} }
zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList;
streamList.push_back(STREAM);
streamList.push_back(ENDSTREAM);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
subscriber->connect(endpoint.c_str());
return subscriber;
}
zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) { zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB); zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB);
...@@ -344,6 +364,15 @@ std::string ServicesImpl::createTerminatedUnmanagedRequest(int id) const { ...@@ -344,6 +364,15 @@ std::string ServicesImpl::createTerminatedUnmanagedRequest(int id) const {
return result; return result;
} }
std::string ServicesImpl::createOutputRequest(const std::string& name) const {
proto::OutputCommand command;
command.set_name(name);
std::string result;
command.SerializeToString(&result);
return result;
}
bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout) { bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout) {
try { try {
...@@ -447,6 +476,8 @@ proto::MessageType_Type ServicesImpl::convertToProtoType(ProtoType type) const { ...@@ -447,6 +476,8 @@ proto::MessageType_Type ServicesImpl::convertToProtoType(ProtoType type) const {
return proto::MessageType_Type_STARTEDUNMANAGED; return proto::MessageType_Type_STARTEDUNMANAGED;
} else if (type == PROTO_TERMINATEDUNMANAGED) { } else if (type == PROTO_TERMINATEDUNMANAGED) {
return proto::MessageType_Type_TERMINATEDUNMANAGED; return proto::MessageType_Type_TERMINATEDUNMANAGED;
} else if (type == PROTO_OUTPUT) {
return proto::MessageType_Type_OUTPUT;
} else { } else {
cerr << "unsupported proto type" << endl; cerr << "unsupported proto type" << endl;
return proto::MessageType_Type(0); return proto::MessageType_Type(0);
......
...@@ -60,13 +60,16 @@ public: ...@@ -60,13 +60,16 @@ public:
std::string createRemovePortRequest(int id, const std::string& name) const; std::string createRemovePortRequest(int id, const std::string& name) const;
std::string createStartedUnmanagedRequest(const std::string& name) const; std::string createStartedUnmanagedRequest(const std::string& name) const;
std::string createTerminatedUnmanagedRequest(int id) const; std::string createTerminatedUnmanagedRequest(int id) const;
std::string createOutputRequest(const std::string& name) const;
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint); zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint); zmq::socket_t * createCancelPublisher(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1); std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const; std::string createShowStreamRequest(int id) const;
proto::MessageType_Type convertToProtoType(ProtoType type) const; proto::MessageType_Type convertToProtoType(ProtoType type) const;
int m_timeout; int m_timeout;
...@@ -77,6 +80,8 @@ public: ...@@ -77,6 +80,8 @@ public:
static const std::string PUBLISHER; static const std::string PUBLISHER;
static const std::string PORT; static const std::string PORT;
static const std::string CANCEL; static const std::string CANCEL;
static const std::string STREAM;
static const std::string ENDSTREAM;
}; };
} }
......