Commit 15263d86 authored by Shervin Nourbakhsh's avatar Shervin Nourbakhsh
Browse files

Merge branch 'main_master' into cpp_master

parents 4bfa20d1 2854efba
...@@ -2,7 +2,7 @@ if(NOT DEFINED PROJECT_NAME) ...@@ -2,7 +2,7 @@ if(NOT DEFINED PROJECT_NAME)
cmake_minimum_required(VERSION 3.7.2) cmake_minimum_required(VERSION 3.7.2)
# Project name and version # Project name and version
project(cameo VERSION 1.0.2 LANGUAGES CXX) project(cameo VERSION 1.1.0 LANGUAGES CXX)
#cmake_policy(SET CMP0048 NEW) #cmake_policy(SET CMP0048 NEW)
endif() endif()
......
1.1.0
-----
* Removed Option enum and defined OUTPUSTREAM as const int.
* Added Instance::waitFor() and Instance::waitFor(int) for python binding.
* Implemented subscriber and requester with optional string result in receive() function.
* Implemented Instance::getResult() with optional string result.
1.0.2 1.0.2
----- -----
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include <set> #include <set>
#include <memory> #include <memory>
#include <optional>
#include "InvalidArgumentException.h" #include "InvalidArgumentException.h"
#include "UnmanagedApplicationException.h" #include "UnmanagedApplicationException.h"
#include "SocketException.h" #include "SocketException.h"
...@@ -39,10 +40,10 @@ ...@@ -39,10 +40,10 @@
namespace cameo { namespace cameo {
enum Option { /**
NONE = 0, * Option output stream.
OUTPUTSTREAM = 1 */
}; const int OUTPUTSTREAM = 1;
class Server; class Server;
class EventStreamSocket; class EventStreamSocket;
...@@ -255,8 +256,10 @@ public: ...@@ -255,8 +256,10 @@ public:
bool stop(); bool stop();
bool kill(); bool kill();
State waitFor(StateHandlerType handler = nullptr); State waitFor(int states, StateHandlerType handler);
State waitFor(int states, StateHandlerType handler = nullptr); State waitFor(int states);
State waitFor(StateHandlerType handler);
State waitFor();
State waitFor(const std::string& eventName); State waitFor(const std::string& eventName);
State waitFor(KeyValue& keyValue); State waitFor(KeyValue& keyValue);
...@@ -288,8 +291,8 @@ public: ...@@ -288,8 +291,8 @@ public:
*/ */
int getExitCode() const; int getExitCode() const;
bool getBinaryResult(std::string& result); std::optional<std::string> getBinaryResult();
bool getResult(std::string& result); std::optional<std::string> getResult();
std::shared_ptr<OutputStreamSocket> getOutputStreamSocket(); std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();
...@@ -401,11 +404,19 @@ public: ...@@ -401,11 +404,19 @@ public:
bool isCanceled() const; bool isCanceled() const;
/** /**
* Returns false if the stream finishes. * Returns a string or nothing if the stream has finished.
*/ */
bool receiveBinary(std::string& data) const; std::optional<std::string> receiveBinary() const;
bool receive(std::string& data) const;
bool receiveTwoBinaryParts(std::string& data1, std::string& data2) const; /**
* Returns a string or nothing if the stream has finished.
*/
std::optional<std::string> receive() const;
/**
* Returns a tuple of strings or nothing if the stream has finished.
*/
std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts() const;
void cancel(); void cancel();
...@@ -513,8 +524,15 @@ public: ...@@ -513,8 +524,15 @@ public:
void send(const std::string& request); void send(const std::string& request);
void sendTwoBinaryParts(const std::string& request1, const std::string& request2); void sendTwoBinaryParts(const std::string& request1, const std::string& request2);
bool receiveBinary(std::string& response); /**
bool receive(std::string& response); * Returns a string or nothing if the requester is canceled.
*/
std::optional<std::string> receiveBinary();
/**
* Returns a string or nothing if the requester is canceled.
*/
std::optional<std::string> receive();
void cancel(); void cancel();
......
...@@ -64,11 +64,11 @@ public: ...@@ -64,11 +64,11 @@ public:
*/ */
bool isAvailable() const; bool isAvailable() const;
std::unique_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, Option options = NONE); std::unique_ptr<application::Instance> start(const std::string& name, const std::vector<std::string> &args, int options = 0);
std::unique_ptr<application::Instance> start(const std::string& name, Option options = NONE); std::unique_ptr<application::Instance> start(const std::string& name, int options = 0);
application::InstanceArray connectAll(const std::string& name, Option options = NONE); application::InstanceArray connectAll(const std::string& name, int options = 0);
std::unique_ptr<application::Instance> connect(const std::string& name, Option options = NONE); std::unique_ptr<application::Instance> connect(const std::string& name, int options = 0);
std::unique_ptr<application::Instance> connect(int id, Option options = NONE); std::unique_ptr<application::Instance> connect(int id, int options = 0);
/** /**
* throws ConnectionTimeout * throws ConnectionTimeout
......
...@@ -697,11 +697,21 @@ State Instance::waitFor(int states, StateHandlerType handler) { ...@@ -697,11 +697,21 @@ State Instance::waitFor(int states, StateHandlerType handler) {
return waitFor(states, "", keyValue, handler, true); return waitFor(states, "", keyValue, handler, true);
} }
State Instance::waitFor(int states) {
KeyValue keyValue("");
return waitFor(states, "", keyValue, nullptr, true);
}
State Instance::waitFor(StateHandlerType handler) { State Instance::waitFor(StateHandlerType handler) {
KeyValue keyValue(""); KeyValue keyValue("");
return waitFor(0, "", keyValue, handler, true); return waitFor(0, "", keyValue, handler, true);
} }
State Instance::waitFor() {
KeyValue keyValue("");
return waitFor(0, "", keyValue, nullptr, true);
}
State Instance::waitFor(const std::string& eventName) { State Instance::waitFor(const std::string& eventName) {
KeyValue keyValue(""); KeyValue keyValue("");
return waitFor(0, eventName, keyValue, nullptr, true); return waitFor(0, eventName, keyValue, nullptr, true);
...@@ -738,21 +748,19 @@ int Instance::getExitCode() const { ...@@ -738,21 +748,19 @@ int Instance::getExitCode() const {
return m_exitCode; return m_exitCode;
} }
bool Instance::getBinaryResult(std::string& result) { std::optional<std::string> Instance::getBinaryResult() {
waitFor(); waitFor();
result = m_resultData;
return m_hasResult; if (m_hasResult) {
} return m_resultData;
}
bool Instance::getResult(std::string& result) {
string bytes; return {};
getBinaryResult(bytes); }
parse(bytes, result);
return m_hasResult; std::optional<std::string> Instance::getResult() {
return getBinaryResult();
} }
std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() { std::shared_ptr<OutputStreamSocket> Instance::getOutputStreamSocket() {
...@@ -912,16 +920,16 @@ bool Subscriber::isCanceled() const { ...@@ -912,16 +920,16 @@ bool Subscriber::isCanceled() const {
return m_impl->isCanceled(); return m_impl->isCanceled();
} }
bool Subscriber::receiveBinary(std::string& data) const { std::optional<std::string> Subscriber::receiveBinary() const {
return m_impl->receiveBinary(data); return m_impl->receiveBinary();
} }
bool Subscriber::receive(std::string& data) const { std::optional<std::string> Subscriber::receive() const {
return m_impl->receive(data); return m_impl->receive();
} }
bool Subscriber::receiveTwoBinaryParts(std::string& data1, std::string& data2) const { std::optional<std::tuple<std::string, std::string>> Subscriber::receiveTwoBinaryParts() const {
return m_impl->receiveTwoBinaryParts(data1, data2); return m_impl->receiveTwoBinaryParts();
} }
void Subscriber::cancel() { void Subscriber::cancel() {
...@@ -1139,12 +1147,12 @@ void Requester::sendTwoBinaryParts(const std::string& request1, const std::strin ...@@ -1139,12 +1147,12 @@ void Requester::sendTwoBinaryParts(const std::string& request1, const std::strin
m_impl->sendTwoBinaryParts(request1, request2); m_impl->sendTwoBinaryParts(request1, request2);
} }
bool Requester::receiveBinary(std::string& response) { std::optional<std::string> Requester::receiveBinary() {
return m_impl->receiveBinary(response); return m_impl->receiveBinary();
} }
bool Requester::receive(std::string& response) { std::optional<std::string> Requester::receive() {
return m_impl->receive(response); return m_impl->receive();
} }
void Requester::cancel() { void Requester::cancel() {
......
...@@ -125,11 +125,11 @@ std::unique_ptr<application::Instance> Server::makeInstance() { ...@@ -125,11 +125,11 @@ std::unique_ptr<application::Instance> Server::makeInstance() {
return unique_ptr<application::Instance>(new application::Instance(this)); return unique_ptr<application::Instance>(new application::Instance(this));
} }
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) { std::unique_ptr<application::Instance> Server::start(const std::string& name, int options) {
return start(name, vector<string>(), options); return start(name, vector<string>(), options);
} }
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) { std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, int options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
...@@ -195,7 +195,7 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const { ...@@ -195,7 +195,7 @@ Response Server::stopApplicationAsynchronously(int id, bool immediately) const {
return Response(value, message); return Response(value, message);
} }
application::InstanceArray Server::connectAll(const std::string& name, Option options) { application::InstanceArray Server::connectAll(const std::string& name, int options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
...@@ -261,7 +261,7 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op ...@@ -261,7 +261,7 @@ application::InstanceArray Server::connectAll(const std::string& name, Option op
return aliveInstances; return aliveInstances;
} }
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) { std::unique_ptr<application::Instance> Server::connect(const std::string& name, int options) {
application::InstanceArray instances = connectAll(name, options); application::InstanceArray instances = connectAll(name, options);
...@@ -273,7 +273,7 @@ std::unique_ptr<application::Instance> Server::connect(const std::string& name, ...@@ -273,7 +273,7 @@ std::unique_ptr<application::Instance> Server::connect(const std::string& name,
return std::move(instances[0]); return std::move(instances[0]);
} }
std::unique_ptr<application::Instance> Server::connect(int id, Option options) { std::unique_ptr<application::Instance> Server::connect(int id, int options) {
bool outputStream = ((options & OUTPUTSTREAM) != 0); bool outputStream = ((options & OUTPUTSTREAM) != 0);
......
...@@ -134,7 +134,11 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const st ...@@ -134,7 +134,11 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const st
m_requestSocket->request(request.toString(), requestData1, requestData2); m_requestSocket->request(request.toString(), requestData1, requestData2);
} }
bool RequesterImpl::receiveBinary(std::string& response) { std::optional<std::string> RequesterImpl::receiveBinary() {
if (m_canceled) {
return {};
}
unique_ptr<zmq::message_t> message(new zmq::message_t); unique_ptr<zmq::message_t> message(new zmq::message_t);
m_repSocket->recv(message.get(), 0); m_repSocket->recv(message.get(), 0);
...@@ -145,14 +149,18 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -145,14 +149,18 @@ bool RequesterImpl::receiveBinary(std::string& response) {
int type = request[message::TYPE].GetInt(); int type = request[message::TYPE].GetInt();
if (type == message::CANCEL) {
m_canceled = true;
return {};
}
optional<string> result;
if (type == message::RESPONSE) { if (type == message::RESPONSE) {
// Get the second part for the message. // Get the second part for the message.
message.reset(new zmq::message_t); message.reset(new zmq::message_t);
m_repSocket->recv(message.get(), 0); m_repSocket->recv(message.get(), 0);
response = string(message->data<char>(), message->size()); result = string(message->data<char>(), message->size());
}
else if (type == message::CANCEL) {
m_canceled = true;
} }
// Create the reply. // Create the reply.
...@@ -163,17 +171,11 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -163,17 +171,11 @@ bool RequesterImpl::receiveBinary(std::string& response) {
m_repSocket->send(*reply); m_repSocket->send(*reply);
return !m_canceled; return result;
} }
bool RequesterImpl::receive(std::string& data) { std::optional<std::string> RequesterImpl::receive() {
return receiveBinary();
string bytes;
bool result = receiveBinary(bytes);
parse(bytes, data);
return result;
} }
void RequesterImpl::cancel() { void RequesterImpl::cancel() {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <optional>
#include "GenericWaitingImpl.h" #include "GenericWaitingImpl.h"
#include "zmq.hpp" #include "zmq.hpp"
...@@ -47,8 +48,8 @@ public: ...@@ -47,8 +48,8 @@ public:
void send(const std::string& requestData); void send(const std::string& requestData);
void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2); void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2);
bool receiveBinary(std::string& response); std::optional<std::string> receiveBinary();
bool receive(std::string& response); std::optional<std::string> receive();
void cancel(); void cancel();
void terminate(); void terminate();
......
...@@ -105,7 +105,7 @@ bool SubscriberImpl::isCanceled() const { ...@@ -105,7 +105,7 @@ bool SubscriberImpl::isCanceled() const {
return m_canceled; return m_canceled;
} }
bool SubscriberImpl::receiveBinary(std::string& data) { std::optional<std::string> SubscriberImpl::receiveBinary() {
while (true) { while (true) {
unique_ptr<zmq::message_t> message(new zmq::message_t()); unique_ptr<zmq::message_t> message(new zmq::message_t());
...@@ -116,17 +116,15 @@ bool SubscriberImpl::receiveBinary(std::string& data) { ...@@ -116,17 +116,15 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
if (response == message::Event::STREAM) { if (response == message::Event::STREAM) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data = string(static_cast<char*>(message->data()), message->size()); return string(static_cast<char*>(message->data()), message->size());
return true;
} else if (response == message::Event::ENDSTREAM) { } else if (response == message::Event::ENDSTREAM) {
m_ended = true; m_ended = true;
return false; return {};
} else if (response == message::Event::CANCEL) { } else if (response == message::Event::CANCEL) {
m_canceled = true; m_canceled = true;
return false; return {};
} else if (response == message::Event::STATUS) { } else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
...@@ -147,30 +145,20 @@ bool SubscriberImpl::receiveBinary(std::string& data) { ...@@ -147,30 +145,20 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
|| state == application::KILLED || state == application::KILLED
|| state == application::FAILURE) { || state == application::FAILURE) {
// Exit because the remote application has terminated. // Exit because the remote application has terminated.
return false; return {};
} }
} }
} }
} }
return false; return {};
} }
bool SubscriberImpl::receive(std::string& data) { std::optional<std::string> SubscriberImpl::receive() {
return receiveBinary();
string bytes;
bool stream = receiveBinary(bytes);
if (!stream) {
return false;
}
parse(bytes, data);
return true;
} }
bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) { std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBinaryParts() {
while (true) { while (true) {
unique_ptr<zmq::message_t> message(new zmq::message_t()); unique_ptr<zmq::message_t> message(new zmq::message_t());
...@@ -179,22 +167,25 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data ...@@ -179,22 +167,25 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
string response(static_cast<char*>(message->data()), message->size()); string response(static_cast<char*>(message->data()), message->size());
if (response == message::Event::STREAM) { if (response == message::Event::STREAM) {
std::tuple<std::string, std::string> result;
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data1 = string(static_cast<char*>(message->data()), message->size()); string data1 = string(static_cast<char*>(message->data()), message->size());
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data2 = string(static_cast<char*>(message->data()), message->size()); string data2 = string(static_cast<char*>(message->data()), message->size());
return true; return make_tuple(data1, data2);
} else if (response == message::Event::ENDSTREAM) { } else if (response == message::Event::ENDSTREAM) {
m_ended = true; m_ended = true;
return false; return {};
} else if (response == message::Event::CANCEL) { } else if (response == message::Event::CANCEL) {
return false; return {};
} else if (response == message::Event::STATUS) { } else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
...@@ -215,13 +206,13 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data ...@@ -215,13 +206,13 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
|| state == application::KILLED || state == application::KILLED
|| state == application::FAILURE) { || state == application::FAILURE) {
// Exit because the remote application has terminated. // Exit because the remote application has terminated.
return false; return {};
} }
} }
} }
} }
return false; return {};
} }
WaitingImpl * SubscriberImpl::waiting() { WaitingImpl * SubscriberImpl::waiting() {
......