Commit 915ad2a3 authored by legoc's avatar legoc
Browse files

Implemented cancel in OutputStreamSocket C++. Reviewed isEnded and isCanceled.

parent 070787fb
......@@ -828,7 +828,11 @@ void Publisher::sendTwoBinaryParts(const std::string& data1, const std::string&
}
bool Publisher::hasEnded() const {
return m_impl->hasEnded();
return m_impl->isEnded();
}
bool Publisher::isEnded() const {
return m_impl->isEnded();
}
void Publisher::sendEnd() const {
......@@ -898,7 +902,15 @@ const std::string& Subscriber::getInstanceEndpoint() const {
}
bool Subscriber::hasEnded() const {
return m_impl->hasEnded();
return m_impl->isEnded();
}
bool Subscriber::isEnded() const {
return m_impl->isEnded();
}
bool Subscriber::isCanceled() const {
return m_impl->isCanceled();
}
bool Subscriber::receiveBinary(std::string& data) const {
......@@ -1040,8 +1052,8 @@ std::unique_ptr<Request> Responder::receive() {
return unique_ptr<Request>(new Request(requestImpl));
}
bool Responder::hasEnded() const {
return m_impl->m_ended;
bool Responder::isCanceled() const {
return m_impl->m_canceled;
}
///////////////////////////////////////////////////////////////////////////
......@@ -1135,6 +1147,10 @@ void Requester::cancel() {
m_impl->cancel();
}
bool Requester::isCanceled() const {
return m_impl->m_canceled;
}
///////////////////////////////////////////////////////////////////////////
// Configuration
......
......@@ -303,8 +303,15 @@ public:
void send(const double* data, std::size_t size) const;
void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
void sendEnd() const;
/**
* Deprecated.
* TODO remove in next version.
*/
bool hasEnded() const;
bool isEnded() const;
private:
Publisher(const application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
......@@ -331,8 +338,15 @@ public:
int getInstanceId() const;
const std::string& getInstanceEndpoint() const;
/**
* Deprecated.
* TODO remove in next version.
*/
bool hasEnded() const;
bool isEnded() const;
bool isCanceled() const;
/**
* Returns false if the stream finishes.
*/
......@@ -406,7 +420,8 @@ public:
void cancel();
std::unique_ptr<Request> receive();
bool hasEnded() const;
bool isCanceled() const;
private:
Responder(const application::This * application, int responderPort, const std::string& name);
......@@ -442,6 +457,8 @@ public:
void cancel();
bool isCanceled() const;
private:
Requester(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
......
......@@ -18,16 +18,16 @@
#include "impl/SocketImpl.h"
#include "impl/SocketWaitingImpl.h"
#include "impl/ServicesImpl.h"
#include "../proto/Messages.pb.h"
#include <iostream>
using namespace std;
namespace cameo {
Output::Output(int id, const std::string& message, bool end) {
m_id = id;
m_message = message;
m_end = end;
Output::Output() :
m_id(0) {
}
int Output::getId() const {
......@@ -38,38 +38,30 @@ const std::string& Output::getMessage() const {
return m_message;
}
bool Output::isEnd() const {
return m_end;
}
OutputStreamSocket::OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl) :
m_streamString(streamString),
m_endOfStreamString(endOfStreamString),
OutputStreamSocket::OutputStreamSocket(SocketImpl * impl) :
m_ended(false),
m_canceled(false),
m_impl(impl) {
}
OutputStreamSocket::~OutputStreamSocket() {
}
std::unique_ptr<Output> OutputStreamSocket::receive() {
bool OutputStreamSocket::receive(Output& output) {
unique_ptr<zmq::message_t> message(m_impl->receive());
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
return unique_ptr<Output>(nullptr);
}
string response(static_cast<char*>(message->data()), message->size());
bool end = false;
if (response == m_streamString) {
end = false;
if (response == ServicesImpl::STREAM) {
}
else if (response == m_endOfStreamString) {
end = true;
else if (response == ServicesImpl::ENDSTREAM) {
m_ended = true;
return false;
}
else if (response == ServicesImpl::CANCEL) {
m_canceled = true;
return false;
}
message = m_impl->receive();
......@@ -77,13 +69,24 @@ std::unique_ptr<Output> OutputStreamSocket::receive() {
proto::ApplicationStream protoStream;
protoStream.ParseFromArray(message->data(), message->size());
return unique_ptr<Output>(new Output(protoStream.id(), protoStream.message(), end));
output.m_id = protoStream.id();
output.m_message = protoStream.message();
return true;
}
void OutputStreamSocket::cancel() {
m_impl->cancel();
}
bool OutputStreamSocket::isEnded() const {
return m_ended;
}
bool OutputStreamSocket::isCanceled() const {
return m_canceled;
}
WaitingImpl * OutputStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
......
......@@ -33,19 +33,17 @@ class Instance;
class Output {
friend class OutputStreamSocket;
public:
Output(int id, const std::string& message, bool end);
Output();
int getId() const;
const std::string& getMessage() const;
bool isEnd() const;
private:
int m_id;
std::string m_message;
bool m_end;
};
......@@ -57,16 +55,18 @@ class OutputStreamSocket {
public:
~OutputStreamSocket();
std::unique_ptr<Output> receive();
bool receive(Output& ouput);
void cancel();
bool isEnded() const;
bool isCanceled() const;
private:
OutputStreamSocket(const std::string& streamString, const std::string& endOfStreamString, SocketImpl * impl);
OutputStreamSocket(SocketImpl * impl);
WaitingImpl * waiting();
std::string m_streamString;
std::string m_endOfStreamString;
bool m_ended;
bool m_canceled;
std::unique_ptr<SocketImpl> m_impl;
};
......
......@@ -147,9 +147,14 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port)
// Prepare our context and subscriber
string streamEndpoint = m_url + ":" + to_string(port);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(ServicesImpl::STREAM, ServicesImpl::ENDSTREAM, new SocketImpl(subscriber)));
// We define a unique name that depends on the event stream socket object because there can be many (instances).
string cancelEndpoint = "inproc://cancel." + to_string(CancelIdGenerator::newId());
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);
return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new SocketImpl(subscriber, cancelPublisher)));
}
}
......@@ -224,7 +224,7 @@ void PublisherImpl::setEnd() {
}
}
bool PublisherImpl::hasEnded() {
bool PublisherImpl::isEnded() {
return m_ended;
}
......
......@@ -52,7 +52,7 @@ public:
void send(const double* data, std::size_t size);
void sendTwoBinaryParts(const std::string& data1, const std::string& data2);
void setEnd();
bool hasEnded();
bool isEnded();
void terminate();
void publish(const std::string& header, const char* data, std::size_t size);
......
......@@ -33,7 +33,8 @@ RequesterImpl::RequesterImpl(const application::This * application, const std::s
m_requesterPort(requesterPort),
m_name(name),
m_responderId(responderId),
m_requesterId(requesterId) {
m_requesterId(requesterId),
m_canceled(false) {
stringstream repEndpoint;
repEndpoint << url << ":" << responderPort;
......@@ -129,22 +130,20 @@ bool RequesterImpl::receiveBinary(std::string& response) {
proto::MessageType messageType;
messageType.ParseFromArray((*message).data(), (*message).size());
bool canceled = false;
if (message->more()) {
message.reset(new zmq::message_t);
m_requester->recv(message.get(), 0);
} else {
cerr << "unexpected number of frames, should be 2" << endl;
canceled = true;
m_canceled = true;
}
if (messageType.type() == proto::MessageType_Type_RESPONSE) {
response = string(static_cast<char*>(message->data()), message->size());
} else if (messageType.type() == proto::MessageType_Type_CANCEL) {
canceled = true;
m_canceled = true;
}
// Create the reply
......@@ -155,7 +154,7 @@ bool RequesterImpl::receiveBinary(std::string& response) {
m_requester->send(*reply);
return !canceled;
return !m_canceled;
}
bool RequesterImpl::receive(std::string& data) {
......
......@@ -58,6 +58,7 @@ public:
int m_responderId;
int m_requesterId;
std::unique_ptr<zmq::socket_t> m_requester;
bool m_canceled;
static const std::string REQUESTER_PREFIX;
......
......@@ -31,7 +31,7 @@ ResponderImpl::ResponderImpl(const application::This * application, int responde
m_application(application),
m_responderPort(responderPort),
m_name(name),
m_ended(false) {
m_canceled(false) {
// create a socket REP
m_responder.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
......@@ -78,8 +78,6 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
} else {
cerr << "unexpected number of frames, should be 2" << endl;
m_ended = true;
return unique_ptr<RequestImpl>(nullptr);
}
......@@ -112,7 +110,7 @@ std::unique_ptr<RequestImpl> ResponderImpl::receive() {
}
} else if (messageType.type() == proto::MessageType_Type_CANCEL) {
m_ended = true;
m_canceled = true;
} else {
cerr << "unknown message type " << messageType.type() << endl;
......
......@@ -47,7 +47,7 @@ public:
int m_responderPort;
std::string m_name;
std::unique_ptr<zmq::socket_t> m_responder;
bool m_ended;
bool m_canceled;
static const std::string RESPONDER_PREFIX;
};
......
......@@ -209,24 +209,25 @@ zmq::socket_t * ServicesImpl::createEventSubscriber(const std::string& endpoint,
return subscriber;
}
zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint) {
zmq::socket_t * ServicesImpl::createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
vector<string> streamList;
streamList.push_back(STREAM);
streamList.push_back(ENDSTREAM);
streamList.push_back(CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
subscriber->connect(endpoint.c_str());
subscriber->connect(cancelEndpoint.c_str());
return subscriber;
}
zmq::socket_t * ServicesImpl::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB);
......
......@@ -63,7 +63,7 @@ public:
std::string createOutputRequest(const std::string& name) const;
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
std::unique_ptr<zmq::message_t> tryRequestWithOnePartReply(const std::string& strRequestType, const std::string& strRequestData, const std::string& endpoint, int overrideTimeout = -1);
......
......@@ -44,7 +44,8 @@ SubscriberImpl::SubscriberImpl(const Server * server, const std::string & url, i
m_instanceId(instanceId),
m_instanceEndpoint(instanceEndpoint),
m_statusEndpoint(statusEndpoint),
m_endOfStream(false) {
m_ended(false),
m_canceled(false) {
}
SubscriberImpl::~SubscriberImpl() {
......@@ -112,8 +113,12 @@ void SubscriberImpl::init() {
}
}
bool SubscriberImpl::hasEnded() const {
return m_endOfStream;
bool SubscriberImpl::isEnded() const {
return m_ended;
}
bool SubscriberImpl::isCanceled() const {
return m_canceled;
}
bool SubscriberImpl::receiveBinary(std::string& data) {
......@@ -132,10 +137,11 @@ bool SubscriberImpl::receiveBinary(std::string& data) {
return true;
} else if (response == ENDSTREAM) {
m_endOfStream = true;
m_ended = true;
return false;
} else if (response == CANCEL) {
m_canceled = true;
return false;
} else if (response == STATUS) {
......@@ -253,7 +259,7 @@ bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data
return true;
} else if (response == ENDSTREAM) {
m_endOfStream = true;
m_ended = true;
return false;
} else if (response == CANCEL) {
......
......@@ -33,7 +33,10 @@ public:
~SubscriberImpl();
void init();
bool hasEnded() const;
bool isEnded() const;
bool isCanceled() const;
bool receiveBinary(std::string& data);
bool receive(std::string& data);
bool receive(std::vector<int32_t>& data);
......@@ -57,7 +60,8 @@ public:
std::unique_ptr<zmq::socket_t> m_subscriber;
std::string m_cancelEndpoint;
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
bool m_endOfStream;
bool m_ended;
bool m_canceled;
static const std::string SYNC;
static const std::string STREAM;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment