Commit ce1e1776 authored by legoc's avatar legoc
Browse files

(split) Implemented subscriber and requester with optional string result in receive() function

parent 7c0d1ce5
...@@ -2,7 +2,8 @@ ...@@ -2,7 +2,8 @@
----- -----
* Removed Option enum and defined OUTPUSTREAM as const int. * Removed Option enum and defined OUTPUSTREAM as const int.
* Added Instance::waitFor() and Instance::waitFor(int) for python binding. * Added Instance::waitFor() and Instance::waitFor(int) for python binding.
* Implemented subscriber and requester with optional string result in receive() function.
1.0.2 1.0.2
----- -----
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include <set> #include <set>
#include <memory> #include <memory>
#include <optional>
#include "InvalidArgumentException.h" #include "InvalidArgumentException.h"
#include "UnmanagedApplicationException.h" #include "UnmanagedApplicationException.h"
#include "SocketException.h" #include "SocketException.h"
...@@ -403,11 +404,19 @@ public: ...@@ -403,11 +404,19 @@ public:
bool isCanceled() const; bool isCanceled() const;
/** /**
* Returns false if the stream finishes. * Returns a string or nothing if the stream has finished.
*/ */
bool receiveBinary(std::string& data) const; std::optional<std::string> receiveBinary() const;
bool receive(std::string& data) const;
bool receiveTwoBinaryParts(std::string& data1, std::string& data2) const; /**
* Returns a string or nothing if the stream has finished.
*/
std::optional<std::string> receive() const;
/**
* Returns a tuple of strings or nothing if the stream has finished.
*/
std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts() const;
void cancel(); void cancel();
...@@ -515,8 +524,15 @@ public: ...@@ -515,8 +524,15 @@ public:
void send(const std::string& request); void send(const std::string& request);
void sendTwoBinaryParts(const std::string& request1, const std::string& request2); void sendTwoBinaryParts(const std::string& request1, const std::string& request2);
bool receiveBinary(std::string& response); /**
bool receive(std::string& response); * Returns a string or nothing if the requester is canceled.
*/
std::optional<std::string> receiveBinary();
/**
* Returns a string or nothing if the requester is canceled.
*/
std::optional<std::string> receive();
void cancel(); void cancel();
......
...@@ -922,16 +922,16 @@ bool Subscriber::isCanceled() const { ...@@ -922,16 +922,16 @@ bool Subscriber::isCanceled() const {
return m_impl->isCanceled(); return m_impl->isCanceled();
} }
bool Subscriber::receiveBinary(std::string& data) const { std::optional<std::string> Subscriber::receiveBinary() const {
return m_impl->receiveBinary(data); return m_impl->receiveBinary();
} }
bool Subscriber::receive(std::string& data) const { std::optional<std::string> Subscriber::receive() const {
return m_impl->receive(data); return m_impl->receive();
} }
bool Subscriber::receiveTwoBinaryParts(std::string& data1, std::string& data2) const { std::optional<std::tuple<std::string, std::string>> Subscriber::receiveTwoBinaryParts() const {
return m_impl->receiveTwoBinaryParts(data1, data2); return m_impl->receiveTwoBinaryParts();
} }
void Subscriber::cancel() { void Subscriber::cancel() {
...@@ -1149,12 +1149,12 @@ void Requester::sendTwoBinaryParts(const std::string& request1, const std::strin ...@@ -1149,12 +1149,12 @@ void Requester::sendTwoBinaryParts(const std::string& request1, const std::strin
m_impl->sendTwoBinaryParts(request1, request2); m_impl->sendTwoBinaryParts(request1, request2);
} }
bool Requester::receiveBinary(std::string& response) { std::optional<std::string> Requester::receiveBinary() {
return m_impl->receiveBinary(response); return m_impl->receiveBinary();
} }
bool Requester::receive(std::string& response) { std::optional<std::string> Requester::receive() {
return m_impl->receive(response); return m_impl->receive();
} }
void Requester::cancel() { void Requester::cancel() {
......
...@@ -134,7 +134,11 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const st ...@@ -134,7 +134,11 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const st
m_requestSocket->request(request.toString(), requestData1, requestData2); m_requestSocket->request(request.toString(), requestData1, requestData2);
} }
bool RequesterImpl::receiveBinary(std::string& response) { std::optional<std::string> RequesterImpl::receiveBinary() {
if (m_canceled) {
return {};
}
unique_ptr<zmq::message_t> message(new zmq::message_t); unique_ptr<zmq::message_t> message(new zmq::message_t);
m_repSocket->recv(message.get(), 0); m_repSocket->recv(message.get(), 0);
...@@ -145,14 +149,18 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -145,14 +149,18 @@ bool RequesterImpl::receiveBinary(std::string& response) {
int type = request[message::TYPE].GetInt(); int type = request[message::TYPE].GetInt();
if (type == message::CANCEL) {
m_canceled = true;
return {};
}
optional<string> result;
if (type == message::RESPONSE) { if (type == message::RESPONSE) {
// Get the second part for the message. // Get the second part for the message.
message.reset(new zmq::message_t); message.reset(new zmq::message_t);
m_repSocket->recv(message.get(), 0); m_repSocket->recv(message.get(), 0);
response = string(message->data<char>(), message->size()); result = string(message->data<char>(), message->size());
}
else if (type == message::CANCEL) {
m_canceled = true;
} }
// Create the reply. // Create the reply.
...@@ -163,17 +171,11 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -163,17 +171,11 @@ bool RequesterImpl::receiveBinary(std::string& response) {
m_repSocket->send(*reply); m_repSocket->send(*reply);
return !m_canceled; return result;
} }
bool RequesterImpl::receive(std::string& data) { std::optional<std::string> RequesterImpl::receive() {
return receiveBinary();
string bytes;
bool result = receiveBinary(bytes);
parse(bytes, data);
return result;
} }
void RequesterImpl::cancel() { void RequesterImpl::cancel() {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <optional>
#include "GenericWaitingImpl.h" #include "GenericWaitingImpl.h"
#include "zmq.hpp" #include "zmq.hpp"
...@@ -47,8 +48,8 @@ public: ...@@ -47,8 +48,8 @@ public:
void send(const std::string& requestData); void send(const std::string& requestData);
void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2); void sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2);
bool receiveBinary(std::string& response); std::optional<std::string> receiveBinary();
bool receive(std::string& response); std::optional<std::string> receive();
void cancel(); void cancel();
void terminate(); void terminate();
......
...@@ -105,7 +105,7 @@ bool SubscriberImpl::isCanceled() const { ...@@ -105,7 +105,7 @@ bool SubscriberImpl::isCanceled() const {
return m_canceled; return m_canceled;
} }
bool SubscriberImpl::receiveBinary(std::string& data) { std::optional<std::string> SubscriberImpl::receiveBinary() {
while (true) { while (true) {
unique_ptr<zmq::message_t> message(new zmq::message_t()); unique_ptr<zmq::message_t> message(new zmq::message_t());
...@@ -116,17 +116,15 @@ bool SubscriberImpl::receiveBinary(std::string& data) { ...@@ -116,17 +116,15 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
if (response == message::Event::STREAM) { if (response == message::Event::STREAM) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data = string(static_cast<char*>(message->data()), message->size()); return string(static_cast<char*>(message->data()), message->size());
return true;
} else if (response == message::Event::ENDSTREAM) { } else if (response == message::Event::ENDSTREAM) {
m_ended = true; m_ended = true;
return false; return {};
} else if (response == message::Event::CANCEL) { } else if (response == message::Event::CANCEL) {
m_canceled = true; m_canceled = true;
return false; return {};
} else if (response == message::Event::STATUS) { } else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
...@@ -147,30 +145,20 @@ bool SubscriberImpl::receiveBinary(std::string& data) { ...@@ -147,30 +145,20 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
|| state == application::KILLED || state == application::KILLED
|| state == application::FAILURE) { || state == application::FAILURE) {
// Exit because the remote application has terminated. // Exit because the remote application has terminated.
return false; return {};
} }
} }
} }
} }
return false; return {};
} }
bool SubscriberImpl::receive(std::string& data) { std::optional<std::string> SubscriberImpl::receive() {
return receiveBinary();
string bytes;
bool stream = receiveBinary(bytes);
if (!stream) {
return false;
}
parse(bytes, data);
return true;
} }
bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) { std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBinaryParts() {
while (true) { while (true) {
unique_ptr<zmq::message_t> message(new zmq::message_t()); unique_ptr<zmq::message_t> message(new zmq::message_t());
...@@ -179,22 +167,25 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data ...@@ -179,22 +167,25 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
string response(static_cast<char*>(message->data()), message->size()); string response(static_cast<char*>(message->data()), message->size());
if (response == message::Event::STREAM) { if (response == message::Event::STREAM) {
std::tuple<std::string, std::string> result;
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data1 = string(static_cast<char*>(message->data()), message->size()); string data1 = string(static_cast<char*>(message->data()), message->size());
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
m_subscriber->recv(message.get()); m_subscriber->recv(message.get());
data2 = string(static_cast<char*>(message->data()), message->size()); string data2 = string(static_cast<char*>(message->data()), message->size());
return true; return make_tuple(data1, data2);
} else if (response == message::Event::ENDSTREAM) { } else if (response == message::Event::ENDSTREAM) {
m_ended = true; m_ended = true;
return false; return {};
} else if (response == message::Event::CANCEL) { } else if (response == message::Event::CANCEL) {
return false; return {};
} else if (response == message::Event::STATUS) { } else if (response == message::Event::STATUS) {
message.reset(new zmq::message_t()); message.reset(new zmq::message_t());
...@@ -215,13 +206,13 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data ...@@ -215,13 +206,13 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
|| state == application::KILLED || state == application::KILLED
|| state == application::FAILURE) { || state == application::FAILURE) {
// Exit because the remote application has terminated. // Exit because the remote application has terminated.
return false; return {};
} }
} }
} }
} }
return false; return {};
} }
WaitingImpl * SubscriberImpl::waiting() { WaitingImpl * SubscriberImpl::waiting() {
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include "zmq.hpp" #include "zmq.hpp"
#include <string> #include <string>
#include <vector> #include <vector>
#include <optional>
#include <tuple>
namespace cameo { namespace cameo {
...@@ -37,9 +39,9 @@ public: ...@@ -37,9 +39,9 @@ public:
bool isEnded() const; bool isEnded() const;
bool isCanceled() const; bool isCanceled() const;
bool receiveBinary(std::string& data); std::optional<std::string> receiveBinary();
bool receive(std::string& data); std::optional<std::string> receive();
bool receiveTwoBinaryParts(std::string& data1, std::string& data2); std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts();
WaitingImpl * waiting(); WaitingImpl * waiting();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment