Commit 50d5b8d4 authored by legoc's avatar legoc
Browse files

Implemented cancel in OutputStreamSocket C++. Reviewed isEnded and isCanceled.

parent 5c4e6acf
...@@ -828,7 +828,11 @@ void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string& ...@@ -828,7 +828,11 @@ void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string&
} }
bool Publisher::hasEnded() const { bool Publisher::hasEnded() const {
return m_impl->hasEnded(); return m_impl->isEnded();
}
bool Publisher::isEnded() const {
return m_impl->isEnded();
} }
void Publisher::sendEnd() const { void Publisher::sendEnd() const {
...@@ -898,7 +902,15 @@ const std::string& Subscriber::getInstanceEndpoint() const { ...@@ -898,7 +902,15 @@ const std::string& Subscriber::getInstanceEndpoint() const {
} }
bool Subscriber::hasEnded() const { bool Subscriber::hasEnded() const {
return m_impl->hasEnded(); return m_impl->isEnded();
}
bool Subscriber::isEnded() const {
return m_impl->isEnded();
}
bool Subscriber::isCanceled() const {
return m_impl->isCanceled();
} }
bool Subscriber::receiveBinary(std::string& data) const { bool Subscriber::receiveBinary(std::string& data) const {
...@@ -1040,8 +1052,8 @@ std::unique_ptr<Request> Responder::receive() { ...@@ -1040,8 +1052,8 @@ std::unique_ptr<Request> Responder::receive() {
return unique_ptr<Request>(new Request(requestImpl)); return unique_ptr<Request>(new Request(requestImpl));
} }
bool Responder::hasEnded() const { bool Responder::isCanceled() const {
return m_impl->m_ended; return m_impl->m_canceled;
} }
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
...@@ -1135,6 +1147,10 @@ void Requester::cancel() { ...@@ -1135,6 +1147,10 @@ void Requester::cancel() {
m_impl->cancel(); m_impl->cancel();
} }
bool Requester::isCanceled() const {
return m_impl->m_canceled;
}
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
// Configuration // Configuration
......
...@@ -303,8 +303,15 @@ public: ...@@ -303,8 +303,15 @@ public:
void send(const double* data, std::size_t size) const; void send(const double* data, std::size_t size) const;
void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const; void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
void sendEnd() const; void sendEnd() const;
/**
* Deprecated.
* TODO remove in next version.
*/
bool hasEnded() const; bool hasEnded() const;
bool isEnded() const;
private: private:
Publisher(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers); Publisher(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
...@@ -331,8 +338,15 @@ public: ...@@ -331,8 +338,15 @@ public:
int getInstanceId() const; int getInstanceId() const;
const std::string& getInstanceEndpoint() const; const std::string& getInstanceEndpoint() const;
/**
* Deprecated.
* TODO remove in next version.
*/
bool hasEnded() const; bool hasEnded() const;
bool isEnded() const;
bool isCanceled() const;
/** /**
* Returns false if the stream finishes. * Returns false if the stream finishes.
*/ */
...@@ -406,7 +420,8 @@ public: ...@@ -406,7 +420,8 @@ public:
void cancel(); void cancel();
std::unique_ptr<Request> receive(); std::unique_ptr<Request> receive();
bool hasEnded() const;
bool isCanceled() const;
private: private:
Responder(const application::This * application, int responderPort, const std::string& name); Responder(const application::This * application, int responderPort, const std::string& name);
...@@ -442,6 +457,8 @@ public: ...@@ -442,6 +457,8 @@ public:
void cancel(); void cancel();
bool isCanceled() const;
private: private:
Requester(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId); Requester(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
......
...@@ -18,16 +18,16 @@ ...@@ -18,16 +18,16 @@
#include "impl/SocketImpl.h" #include "impl/SocketImpl.h"
#include "impl/SocketWaitingImpl.h" #include "impl/SocketWaitingImpl.h"
#include "impl/ServicesImpl.h"
#include "../proto/Messages.pb.h" #include "../proto/Messages.pb.h"
#include <iostream>
using namespace std; using namespace std;
namespace cameo { namespace cameo {
Output::Output(int id, const std::string& message, bool end) { Output::Output() :
m_id = id; m_id(0) {
m_message = message;
m_end = end;
} }
int Output::getId() const { int Output::getId() const {
...@@ -38,38 +38,30 @@ const std::string& Output::getMessage() const { ...@@ -38,38 +38,30 @@ const std::string& Output::getMessage() const {
return m_message; return m_message;
} }
bool Output::isEnd() const { OutputStreamSocket::OutputStreamSocket(SocketImpl * impl) :
return m_end; m_ended(false),
} m_canceled(false),
OutputStreamSocket::OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl) :
m_streamString(streamString),
m_endOfStreamString(endOfStreamString),
m_impl(impl) { m_impl(impl) {
} }
OutputStreamSocket::~OutputStreamSocket() { OutputStreamSocket::~OutputStreamSocket() {
} }
bool OutputStreamSocket::receive(Output& output) {
std::unique_ptr<Output> OutputStreamSocket::receive() {
unique_ptr<zmq::message_t> message(m_impl->receive()); unique_ptr<zmq::message_t> message(m_impl->receive());
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
return unique_ptr<Output>(nullptr);
}
string response(static_cast<char*>(message->data()), message->size()); string response(static_cast<char*>(message->data()), message->size());
bool end = false; if (response == ServicesImpl::STREAM) {
if (response == m_streamString) {
end = false;
} }
else if (response == m_endOfStreamString) { else if (response == ServicesImpl::ENDSTREAM) {
end = true; m_ended = true;
return false;
}
else if (response == ServicesImpl::CANCEL) {
m_canceled = true;
return false;
} }
message = m_impl->receive(); message = m_impl->receive();
...@@ -77,13 +69,24 @@ std::unique_ptr<Output> OutputStreamSocket::receive() { ...@@ -77,13 +69,24 @@ std::unique_ptr<Output> OutputStreamSocket::receive() {
proto::ApplicationStream protoStream; proto::ApplicationStream protoStream;
protoStream.ParseFromArray(message->data(), message->size()); protoStream.ParseFromArray(message->data(), message->size());
return unique_ptr<Output>(new Output(protoStream.id(), protoStream.message(), end)); output.m_id = protoStream.id();
output.m_message = protoStream.message();
return true;
} }
void OutputStreamSocket::cancel() { void OutputStreamSocket::cancel() {
m_impl->cancel(); m_impl->cancel();
} }
bool OutputStreamSocket::isEnded() const {
return m_ended;
}
bool OutputStreamSocket::isCanceled() const {
return m_canceled;
}
WaitingImpl * OutputStreamSocket::waiting() { WaitingImpl * OutputStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl // We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL"); return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
......
...@@ -33,19 +33,17 @@ class Instance; ...@@ -33,19 +33,17 @@ class Instance;
class Output { class Output {
friend class OutputStreamSocket;
public: public:
Output(int id, const std::string& message, bool end); Output();
int getId() const; int getId() const;
const std::string& getMessage() const; const std::string& getMessage() const;
bool isEnd() const;
private: private:
int m_id; int m_id;
std::string m_message; std::string m_message;
bool m_end;
}; };
...@@ -57,16 +55,18 @@ class OutputStreamSocket { ...@@ -57,16 +55,18 @@ class OutputStreamSocket {
public: public:
~OutputStreamSocket(); ~OutputStreamSocket();
std::unique_ptr<Output> receive(); bool receive(Output& ouput);
void cancel(); void cancel();
bool isEnded() const;
bool isCanceled() const;
private: private:
OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl); OutputStreamSocket(SocketImpl * impl);
WaitingImpl * waiting(); WaitingImpl * waiting();
std::string m_streamString; bool m_ended;
std::string m_endOfStreamString; bool m_canceled;
std::unique_ptr<SocketImpl> m_impl; std::unique_ptr<SocketImpl> m_impl;
}; };
......
...@@ -147,9 +147,14 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) ...@@ -147,9 +147,14 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
// Prepare our context and subscriber // Prepare our context and subscriber
string streamEndpoint = m_url + ":" + to_string(port); string streamEndpoint = m_url + ":" + to_string(port);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(ServicesImpl::STREAM, ServicesImpl::ENDSTREAM, new SocketImpl(subscriber))); // We define a unique name that depends on the event stream socket object because there can be many (instances).
string cancelEndpoint = "inproc://cancel." + to_string(CancelIdGenerator::newId());
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new SocketImpl(subscriber, cancelPublisher)));
} }
} }
...@@ -224,7 +224,7 @@ void PublisherImpl::setEnd() { ...@@ -224,7 +224,7 @@ void PublisherImpl::setEnd() {
} }
} }
bool PublisherImpl::hasEnded() { bool PublisherImpl::isEnded() {
return m_ended; return m_ended;
} }
......
...@@ -52,7 +52,7 @@ public: ...@@ -52,7 +52,7 @@ public:
void send(const double* data, std::size_t size); void send(const double* data, std::size_t size);
void sendTwoBinaryParts(const std::string& data1, const std::string& data2); void sendTwoBinaryParts(const std::string& data1, const std::string& data2);
void setEnd(); void setEnd();
bool hasEnded(); bool isEnded();
void terminate(); void terminate();
void publish(const std::string& header, const char* data, std::size_t size); void publish(const std::string& header, const char* data, std::size_t size);
......
...@@ -33,7 +33,8 @@ RequesterImpl::RequesterImpl(const application::This * application, const std::s ...@@ -33,7 +33,8 @@ RequesterImpl::RequesterImpl(const application::This * application, const std::s
m_requesterPort(requesterPort), m_requesterPort(requesterPort),
m_name(name), m_name(name),
m_responderId(responderId), m_responderId(responderId),
m_requesterId(requesterId) { m_requesterId(requesterId),
m_canceled(false) {
stringstream repEndpoint; stringstream repEndpoint;
repEndpoint << url << ":" << responderPort; repEndpoint << url << ":" << responderPort;
...@@ -129,22 +130,20 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -129,22 +130,20 @@ bool RequesterImpl::receiveBinary(std::string& response) {
proto::MessageType messageType; proto::MessageType messageType;
messageType.ParseFromArray((*message).data(), (*message).size()); messageType.ParseFromArray((*message).data(), (*message).size());
bool canceled = false;
if (message->more()) { if (message->more()) {
message.reset(new zmq::message_t); message.reset(new zmq::message_t);
m_requester->recv(message.get(), 0); m_requester->recv(message.get(), 0);
} else { } else {
cerr << "unexpected number of frames, should be 2" << endl; cerr << "unexpected number of frames, should be 2" << endl;
canceled = true; m_canceled = true;
} }
if (messageType.type() == proto::MessageType_Type_RESPONSE) { if (messageType.type() == proto::MessageType_Type_RESPONSE) {
response = string(static_cast<char*>(message->data()), message->size()); response = string(static_cast<char*>(message->data()), message->size());
} else if (messageType.type() == proto::MessageType_Type_CANCEL) { } else if (messageType.type() == proto::MessageType_Type_CANCEL) {
canceled = true; m_canceled = true;
} }
// Create the reply // Create the reply
...@@ -155,7 +154,7 @@ bool RequesterImpl::receiveBinary(std::string& response) { ...@@ -155,7 +154,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
m_requester->send(*reply); m_requester->send(*reply);
return !canceled; return !m_canceled;
} }
bool RequesterImpl::receive(std::string& data) { bool RequesterImpl::receive(std::string& data) {
......
...@@ -58,6 +58,7 @@ public: ...@@ -58,6 +58,7 @@ public:
int m_responderId; int m_responderId;
int m_requesterId; int m_requesterId;
std::unique_ptr<zmq::socket_t> m_requester; std::unique_ptr<zmq::socket_t> m_requester;
bool m_canceled;
static const std::string REQUESTER_PREFIX; static const std::string REQUESTER_PREFIX;
......
...@@ -31,7 +31,7 @@ ResponderImpl::ResponderImpl(const application::This * application, int responde ...@@ -31,7 +31,7 @@ ResponderImpl::ResponderImpl(const application::This * application, int responde
m_application(application), m_application(application),
m_responderPort(responderPort), m_responderPort(responderPort),
m_name(name), m_name(name),
m_ended(false) { m_canceled(false) {
// create a socket REP // create a socket REP
m_responder.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP)); m_responder.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
...@@ -78,8 +78,6 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() { ...@@ -78,8 +78,6 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
} else { } else {
cerr << "unexpected number of frames, should be 2" << endl; cerr << "unexpected number of frames, should be 2" << endl;
m_ended = true;
return unique_ptr<RequestImpl>(nullptr); return unique_ptr<RequestImpl>(nullptr);
} }
...@@ -112,7 +110,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() { ...@@ -112,7 +110,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
} }
} else if (messageType.type() == proto::MessageType_Type_CANCEL) { } else if (messageType.type() == proto::MessageType_Type_CANCEL) {
m_ended = true; m_canceled = true;
} else { } else {
cerr << "unknown message type " << messageType.type() << endl; cerr << "unknown message type " << messageType.type() << endl;
......
...@@ -47,7 +47,7 @@ public: ...@@ -47,7 +47,7 @@ public:
int m_responderPort; int m_responderPort;
std::string m_name; std::string m_name;
std::unique_ptr<zmq::socket_t> m_responder; std::unique_ptr<zmq::socket_t> m_responder;
bool m_ended; bool m_canceled;
static const std::string RESPONDER_PREFIX; static const std::string RESPONDER_PREFIX;
}; };
......
...@@ -209,24 +209,25 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint, ...@@ -209,24 +209,25 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint,
return subscriber; return subscriber;
} }
zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint) { zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB); zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList; vector<string> streamList;
streamList.push_back(STREAM); streamList.push_back(STREAM);
streamList.push_back(ENDSTREAM); streamList.push_back(ENDSTREAM);
streamList.push_back(CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) { for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length()); subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
} }
subscriber->connect(endpoint.c_str()); subscriber->connect(endpoint.c_str());
subscriber->connect(cancelEndpoint.c_str());
return subscriber; return subscriber;
} }
zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) { zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB); zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB);
......
...@@ -63,7 +63,7 @@ public: ...@@ -63,7 +63,7 @@ public:
std::string createOutputRequest(const std::string& name) const; std::string createOutputRequest(const std::string& name) const;
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint); zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint); zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint); zmq::socket_t * createCancelPublisher(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1); std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
......
...@@ -44,7 +44,8 @@ SubscriberImpl::SubscriberImpl(const Server * server, const std::string & url, i ...@@ -44,7 +44,8 @@ SubscriberImpl::SubscriberImpl(const Server * server, const std::string & url, i
m_instanceId(instanceId), m_instanceId(instanceId),
m_instanceEndpoint(instanceEndpoint), m_instanceEndpoint(instanceEndpoint),
m_statusEndpoint(statusEndpoint), m_statusEndpoint(statusEndpoint),
m_endOfStream(false) { m_ended(false),
m_canceled(false) {
} }
SubscriberImpl::~SubscriberImpl() { SubscriberImpl::~SubscriberImpl() {
...@@ -112,8 +113,12 @@ void SubscriberImpl::init() { ...@@ -112,8 +113,12 @@ void SubscriberImpl::init() {
} }
} }
bool SubscriberImpl::hasEnded() const {