Commit 070787fb authored by legoc's avatar legoc
Browse files

Added output stream socket for C++ API

parent 8b9995df
......@@ -30,6 +30,7 @@ libcameo_la_SOURCES = \
cameo/impl/CancelIdGenerator.cpp \
cameo/ConnectionChecker.cpp \
cameo/EventStreamSocket.cpp \
cameo/OutputStreamSocket.cpp \
cameo/Response.cpp \
cameo/impl/PublisherImpl.cpp \
cameo/impl/SubscriberImpl.cpp \
......@@ -68,6 +69,7 @@ nobase_include_HEADERS = \
cameo/ConnectionTimeout.h \
cameo/Event.h \
cameo/EventStreamSocket.h \
cameo/OutputStreamSocket.h \
cameo/PublisherCreationException.h \
cameo/ResponderCreationException.h \
cameo/RequesterCreationException.h \
......
......@@ -476,6 +476,10 @@ void Instance::setErrorMessage(const std::string& message) {
m_errorMessage = message;
}
void Instance::setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket) {
m_outputStreamSocket = std::move(socket);
}
void Instance::setPastStates(State pastStates) {
m_pastStates = pastStates;
}
......@@ -700,6 +704,10 @@ bool Instance::getResult(std::vector<double>& result) {
return m_hasResult;
}
std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() {
return m_outputStreamSocket;
}
///////////////////////////////////////////////////////////////////////////
// InstanceArray
......
......@@ -36,11 +36,13 @@
namespace cameo {
enum Option {
NONE = 0
NONE = 0,
OUTPUTSTREAM = 1
};
class Server;
class EventStreamSocket;
class OutputStreamSocket;
class ApplicationImpl;
class PublisherImpl;
class SubscriberImpl;
......@@ -216,18 +218,22 @@ public:
bool getResult(std::vector<float>& result);
bool getResult(std::vector<double>& result);
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
private:
Instance(const Server * server, std::unique_ptr<EventStreamSocket>& socket);
void setId(int id);
void setName(const std::string& name);
void setErrorMessage(const std::string& message);
void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
void setPastStates(State pastStates);
void setInitialState(State state);
State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking);
const Server * m_server;
std::unique_ptr<EventStreamSocket> m_eventSocket;
std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
std::unique_ptr<WaitingImpl> m_waiting;
std::string m_name;
int m_id;
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "OutputStreamSocket.h"
#include "impl/SocketImpl.h"
#include "impl/SocketWaitingImpl.h"
#include "../proto/Messages.pb.h"
using namespace std;
namespace cameo {
Output::Output(int id, const std::string& message, bool end) {
m_id = id;
m_message = message;
m_end = end;
}
int Output::getId() const {
return m_id;
}
const std::string& Output::getMessage() const {
return m_message;
}
bool Output::isEnd() const {
return m_end;
}
OutputStreamSocket::OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl) :
m_streamString(streamString),
m_endOfStreamString(endOfStreamString),
m_impl(impl) {
}
OutputStreamSocket::~OutputStreamSocket() {
}
std::unique_ptr<Output> OutputStreamSocket::receive() {
unique_ptr<zmq::message_t> message(m_impl->receive());
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
return unique_ptr<Output>(nullptr);
}
string response(static_cast<char*>(message->data()), message->size());
bool end = false;
if (response == m_streamString) {
end = false;
}
else if (response == m_endOfStreamString) {
end = true;
}
message = m_impl->receive();
proto::ApplicationStream protoStream;
protoStream.ParseFromArray(message->data(), message->size());
return unique_ptr<Output>(new Output(protoStream.id(), protoStream.message(), end));
}
void OutputStreamSocket::cancel() {
m_impl->cancel();
}
WaitingImpl * OutputStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_OUTPUTSTREAMSOCKET_H_
#define CAMEO_OUTPUTSTREAMSOCKET_H_
#include <memory>
#include "Event.h"
namespace cameo {
class SocketImpl;
class WaitingImpl;
namespace application {
class Instance;
}
class Output {
public:
Output(int id, const std::string& message, bool end);
int getId() const;
const std::string& getMessage() const;
bool isEnd() const;
private:
int m_id;
std::string m_message;
bool m_end;
};
class OutputStreamSocket {
friend class Services;
friend class application::Instance;
public:
~OutputStreamSocket();
std::unique_ptr<Output> receive();
void cancel();
private:
OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl);
WaitingImpl * waiting();
std::string m_streamString;
std::string m_endOfStreamString;
std::unique_ptr<SocketImpl> m_impl;
};
}
#endif
......@@ -48,7 +48,8 @@ enum ProtoType {
PROTO_CANCEL,
PROTO_SETRESULT,
PROTO_STARTEDUNMANAGED,
PROTO_TERMINATEDUNMANAGED
PROTO_TERMINATEDUNMANAGED,
PROTO_OUTPUT
};
}
......
......@@ -96,15 +96,38 @@ std::unique_ptr<application::Instance> Server::start(const std::string& name, Op
return start(name, vector<string>(), options);
}
int Server::getStreamPort(const std::string& name) {
string strRequestType = m_impl->createRequest(PROTO_OUTPUT);
string strRequestData = m_impl->createOutputRequest(name);
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
requestResponse.ParseFromArray((*reply).data(), (*reply).size());
return requestResponse.value();
}
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
unique_ptr<application::Instance> instance = makeInstance();
instance->setName(name);
string strRequestType = m_impl->createRequest(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
try {
if (outputStream) {
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
instance->setOutputStreamSocket(socket);
}
string strRequestType = m_impl->createRequest(PROTO_START);
string strRequestData = m_impl->createStartRequest(name, args, application::This::getReference());
unique_ptr<zmq::message_t> reply = m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_serverEndpoint);
proto::RequestResponse requestResponse;
......@@ -167,7 +190,9 @@ std::unique_ptr<application::Instance> Server::stop(int id, bool immediately) {
return instance;
}
application::InstanceArray Server::connectAll(const std::string& name) {
application::InstanceArray Server::connectAll(const std::string& name, Option options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0);
application::InstanceArray instances;
......@@ -195,6 +220,13 @@ application::InstanceArray Server::connectAll(const std::string& name) {
if (isAlive(applicationId)) {
aliveInstancesCount++;
// we connect to the stream port before starting the application
// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
if (outputStream) {
unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(info.name()));
instance->setOutputStreamSocket(socket);
}
instance->setId(applicationId);
instance->setInitialState(info.applicationstate());
instance->setPastStates(info.pastapplicationstates());
......@@ -219,9 +251,9 @@ application::InstanceArray Server::connectAll(const std::string& name) {
return aliveInstances;
}
std::unique_ptr<application::Instance> Server::connect(const std::string& name) {
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
application::InstanceArray instances = connectAll(name);
application::InstanceArray instances = connectAll(name, options);
if (instances.size() == 0) {
unique_ptr<application::Instance> instance = makeInstance();
......
......@@ -62,8 +62,8 @@ public:
std::unique_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, Option options = NONE);
std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE);
application::InstanceArray connectAll(const std::string& name);
std::unique_ptr<application::Instance> connect(const std::string& name);
application::InstanceArray connectAll(const std::string& name, Option options = NONE);
std::unique_ptr<application::Instance> connect(const std::string& name, Option options = NONE);
/**
* throws ConnectionTimeout
......@@ -102,6 +102,7 @@ private:
std::unique_ptr<application::Instance> stop(int id, bool immediately);
std::unique_ptr<application::Subscriber> createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) const;
int getAvailableTimeout() const;
int getStreamPort(const std::string& name);
};
std::ostream& operator<<(std::ostream&, const Server&);
......
......@@ -139,4 +139,17 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
return unique_ptr<EventStreamSocket>(new EventStreamSocket(new SocketImpl(subscriber, cancelPublisher)));
}
std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) {
if (port == -1) {
return nullptr;
}
// Prepare our context and subscriber
string streamEndpoint = m_url + ":" + to_string(port);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(ServicesImpl::STREAM, ServicesImpl::ENDSTREAM, new SocketImpl(subscriber)));
}
}
......@@ -20,6 +20,7 @@
#include <string>
#include <vector>
#include "EventStreamSocket.h"
#include "OutputStreamSocket.h"
namespace cameo {
......@@ -46,6 +47,7 @@ public:
bool isAvailable(int timeout) const;
void initStatus();
std::unique_ptr<EventStreamSocket> openEventStream();
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(int port);
std::string m_serverEndpoint;
std::string m_url;
......
......@@ -45,4 +45,4 @@ public:
}
#endif
\ No newline at end of file
#endif
......@@ -41,6 +41,8 @@ const std::string ServicesImpl::RESULT = "RESULT";
const std::string ServicesImpl::PUBLISHER = "PUBLISHER";
const std::string ServicesImpl::PORT = "PORT";
const std::string ServicesImpl::CANCEL = "CANCEL";
const std::string ServicesImpl::STREAM = "STREAM";
const std::string ServicesImpl::ENDSTREAM = "ENDSTREAM";
ServicesImpl::ServicesImpl() :
m_context(1), m_timeout(0) {
......@@ -207,6 +209,24 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint,
return subscriber;
}
zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList;
streamList.push_back(STREAM);
streamList.push_back(ENDSTREAM);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
subscriber->connect(endpoint.c_str());
return subscriber;
}
zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB);
......@@ -344,6 +364,15 @@ std::string ServicesImpl::createTerminatedUnmanagedRequest(int id) const {
return result;
}
std::string ServicesImpl::createOutputRequest(const std::string& name) const {
proto::OutputCommand command;
command.set_name(name);
std::string result;
command.SerializeToString(&result);
return result;
}
bool ServicesImpl::isAvailable(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int timeout) {
try {
......@@ -447,6 +476,8 @@ proto::MessageType_Type ServicesImpl::convertToProtoType(ProtoType type) const {
return proto::MessageType_Type_STARTEDUNMANAGED;
} else if (type == PROTO_TERMINATEDUNMANAGED) {
return proto::MessageType_Type_TERMINATEDUNMANAGED;
} else if (type == PROTO_OUTPUT) {
return proto::MessageType_Type_OUTPUT;
} else {
cerr << "unsupported proto type" << endl;
return proto::MessageType_Type(0);
......
......@@ -60,13 +60,16 @@ public:
std::string createRemovePortRequest(int id, const std::string& name) const;
std::string createStartedUnmanagedRequest(const std::string& name) const;
std::string createTerminatedUnmanagedRequest(int id) const;
std::string createOutputRequest(const std::string& name) const;
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
std::string createShowStreamRequest(int id) const;
proto::MessageType_Type convertToProtoType(ProtoType type) const;
int m_timeout;
......@@ -77,6 +80,8 @@ public:
static const std::string PUBLISHER;
static const std::string PORT;
static const std::string CANCEL;
static const std::string STREAM;
static const std::string ENDSTREAM;
};
}
......
......@@ -26,7 +26,7 @@ namespace cameo {
class SocketImpl {
public:
SocketImpl(zmq::socket_t * socket, zmq::socket_t * cancelSocket);
SocketImpl(zmq::socket_t * socket, zmq::socket_t * cancelSocket = nullptr);
virtual ~SocketImpl();
void send(const std::string& data);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment