Commit 872ef321 authored by legoc's avatar legoc
Browse files

Reviewed Subscriber

parent f3006230
......@@ -88,7 +88,7 @@ public:
const std::string& getPublisherName() const;
const std::string& getInstanceName() const;
int getInstanceId() const;
std::string getInstanceEndpoint() const;
Endpoint getInstanceEndpoint() const;
/**
* Deprecated.
......@@ -117,9 +117,8 @@ public:
void cancel();
private:
Subscriber(int publisherPort, int synchronizerPort, const std::string &publisherName, int numberOfSubscribers, application::Instance &instance);
void init();
Subscriber();
void init(application::Instance &instance, const std::string &publisherName, const std::string &instanceName);
static std::unique_ptr<Subscriber> createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName);
static std::string createConnectPublisherRequest(int id, const std::string& publisherName);
......
......@@ -22,8 +22,8 @@
#include "../base/Messages.h"
#include "../base/RequestSocket.h"
#include "../base/Waiting.h"
#include "impl/SubscriberImpl.h"
#include "impl/zmq/PublisherZmq.h"
#include "impl/zmq/SubscriberZmq.h"
namespace cameo {
namespace coms {
......@@ -122,14 +122,17 @@ std::string Publisher::createCreatePublisherRequest(int id, const std::string& n
///////////////////////////////////////////////////////////////////////////
// Subscriber
Subscriber::Subscriber(int publisherPort, int synchronizerPort, const std::string & publisherName, int numberOfSubscribers, application::Instance &instance) :
m_impl(new SubscriberImpl(publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instance)) {
Subscriber::Subscriber() {
m_impl = std::unique_ptr<SubscriberImpl>(new SubscriberZmq());
// Create the waiting here.
m_waiting.reset(new Waiting(std::bind(&Subscriber::cancel, this)));
}
Subscriber::~Subscriber() {
}
std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance & instance, const std::string& publisherName, const std::string& instanceName) {
void Subscriber::init(application::Instance & instance, const std::string& publisherName, const std::string& instanceName) {
// Get the JSON response.
json::Object response = instance.getCom().requestJSON(createConnectPublisherRequest(instance.getId(), publisherName));
......@@ -143,8 +146,13 @@ std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance &
int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
// TODO simplify the use of some variables: e.g. m_serverEndpoint accessible from this.
std::unique_ptr<Subscriber> subscriber(new Subscriber(publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instance));
subscriber->init();
m_impl->init(publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instance);
}
std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName) {
std::unique_ptr<Subscriber> subscriber = std::unique_ptr<Subscriber>(new Subscriber());
subscriber->init(instance, publisherName, instanceName);
return subscriber;
}
......@@ -178,27 +186,20 @@ std::unique_ptr<Subscriber> Subscriber::create(application::Instance & instance,
return std::unique_ptr<Subscriber>(nullptr);
}
void Subscriber::init() {
m_impl->init();
// Create the waiting here.
m_waiting.reset(m_impl->waiting());
}
const std::string& Subscriber::getPublisherName() const {
return m_impl->m_publisherName;
return m_impl->getPublisherName();
}
const std::string& Subscriber::getInstanceName() const {
return m_impl->m_instanceName;
return m_impl->getInstanceName();
}
int Subscriber::getInstanceId() const {
return m_impl->m_instanceId;
return m_impl->getInstanceId();
}
std::string Subscriber::getInstanceEndpoint() const {
return m_impl->m_serverEndpoint.toString();
Endpoint Subscriber::getInstanceEndpoint() const {
return m_impl->getInstanceEndpoint();
}
bool Subscriber::hasEnded() const {
......
......@@ -18,52 +18,32 @@
#define CAMEO_SUBSCRIBERIMPL_H_
#include "Application.h"
#include "zmq.hpp"
#include <string>
#include <vector>
#include <optional>
#include <tuple>
namespace cameo {
class Server;
class Waiting;
namespace coms {
class SubscriberImpl {
public:
SubscriberImpl(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance);
~SubscriberImpl();
void init();
virtual ~SubscriberImpl() {}
bool isEnded() const;
bool isCanceled() const;
virtual void init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance) = 0;
std::optional<std::string> receiveBinary();
std::optional<std::string> receive();
std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts();
virtual const std::string& getPublisherName() const = 0;
virtual const std::string& getInstanceName() const = 0;
virtual int getInstanceId() const = 0;
virtual Endpoint getInstanceEndpoint() const = 0;
void cancel();
Waiting * waiting();
virtual bool isEnded() const = 0;
virtual bool isCanceled() const = 0;
std::string createSubscribePublisherRequest() const;
virtual std::optional<std::string> receiveBinary() = 0;
virtual std::optional<std::string> receive() = 0;
virtual std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts() = 0;
Endpoint m_serverEndpoint;
int m_publisherPort;
int m_synchronizerPort;
std::string m_publisherName;
int m_numberOfSubscribers;
std::string m_instanceName;
int m_instanceId;
std::string m_statusEndpoint;
std::unique_ptr<zmq::socket_t> m_subscriber;
std::string m_cancelEndpoint;
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
bool m_ended;
bool m_canceled;
virtual void cancel() = 0;
};
}
......
......@@ -14,38 +14,44 @@
* limitations under the Licence.
*/
#include "SubscriberImpl.h"
#include "SubscriberZmq.h"
#include "Serializer.h"
#include "Server.h"
#include "JSON.h"
#include "../../base/CancelIdGenerator.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Waiting.h"
#include "../../../base/CancelIdGenerator.h"
#include "../../../base/Messages.h"
#include "../../../base/RequestSocket.h"
#include "../../../base/impl/zmq/ContextZmq.h"
#include "../../../base/Waiting.h"
#include <sstream>
namespace cameo {
namespace coms {
SubscriberImpl::SubscriberImpl(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance) :
m_serverEndpoint(instance.getEndpoint()),
m_publisherName(publisherName),
m_publisherPort(publisherPort),
m_synchronizerPort(synchronizerPort),
m_numberOfSubscribers(numberOfSubscribers),
m_instanceName(instance.getName()),
m_instanceId(instance.getId()),
m_statusEndpoint(instance.getStatusEndpoint().toString()),
SubscriberZmq::SubscriberZmq() :
m_publisherPort(0),
m_synchronizerPort(0),
m_numberOfSubscribers(0),
m_instanceId(0),
m_ended(false),
m_canceled(false) {
}
SubscriberImpl::~SubscriberImpl() {
SubscriberZmq::~SubscriberZmq() {
}
void SubscriberImpl::init() {
void SubscriberZmq::init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance) {
m_serverEndpoint = instance.getEndpoint();
m_publisherName = publisherName;
m_publisherPort = publisherPort;
m_synchronizerPort = synchronizerPort;
m_numberOfSubscribers = numberOfSubscribers;
m_instanceName = instance.getName();
m_instanceId = instance.getId();
m_statusEndpoint = instance.getStatusEndpoint().toString();
m_ended = false;
m_canceled = false;
// Create a socket for publishing.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
......@@ -105,15 +111,31 @@ void SubscriberImpl::init() {
}
}
bool SubscriberImpl::isEnded() const {
const std::string& SubscriberZmq::getPublisherName() const {
return m_publisherName;
}
const std::string& SubscriberZmq::getInstanceName() const {
return m_instanceName;
}
int SubscriberZmq::getInstanceId() const {
return m_instanceId;
}
Endpoint SubscriberZmq::getInstanceEndpoint() const {
return m_serverEndpoint;
}
bool SubscriberZmq::isEnded() const {
return m_ended;
}
bool SubscriberImpl::isCanceled() const {
bool SubscriberZmq::isCanceled() const {
return m_canceled;
}
std::optional<std::string> SubscriberImpl::receiveBinary() {
std::optional<std::string> SubscriberZmq::receiveBinary() {
while (true) {
std::unique_ptr<zmq::message_t> message(new zmq::message_t());
......@@ -162,11 +184,11 @@ std::optional<std::string> SubscriberImpl::receiveBinary() {
return {};
}
std::optional<std::string> SubscriberImpl::receive() {
std::optional<std::string> SubscriberZmq::receive() {
return receiveBinary();
}
std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBinaryParts() {
std::optional<std::tuple<std::string, std::string>> SubscriberZmq::receiveTwoBinaryParts() {
while (true) {
std::unique_ptr<zmq::message_t> message(new zmq::message_t());
......@@ -223,7 +245,7 @@ std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBi
return {};
}
void SubscriberImpl::cancel() {
void SubscriberZmq::cancel() {
std::string m_message = message::Event::CANCEL;
......@@ -236,13 +258,7 @@ void SubscriberImpl::cancel() {
m_cancelPublisher->send(requestData);
}
Waiting * SubscriberImpl::waiting() {
// Waiting gets the cancel publisher.
return new Waiting(std::bind(&SubscriberImpl::cancel, this));
}
std::string SubscriberImpl::createSubscribePublisherRequest() const {
std::string SubscriberZmq::createSubscribePublisherRequest() const {
json::StringObject request;
request.pushKey(message::TYPE);
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_SUBSCRIBERZMQ_H_
#define CAMEO_SUBSCRIBERZMQ_H_
#include "../SubscriberImpl.h"
#include <zmq.hpp>
namespace cameo {
class Server;
namespace coms {
class SubscriberZmq : public SubscriberImpl {
public:
SubscriberZmq();
virtual ~SubscriberZmq();
virtual void init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance);
virtual const std::string& getPublisherName() const;
virtual const std::string& getInstanceName() const;
virtual int getInstanceId() const;
virtual Endpoint getInstanceEndpoint() const;
virtual bool isEnded() const;
virtual bool isCanceled() const;
virtual std::optional<std::string> receiveBinary();
virtual std::optional<std::string> receive();
virtual std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts();
virtual void cancel();
private:
std::string createSubscribePublisherRequest() const;
Endpoint m_serverEndpoint;
int m_publisherPort;
int m_synchronizerPort;
std::string m_publisherName;
int m_numberOfSubscribers;
std::string m_instanceName;
int m_instanceId;
std::string m_statusEndpoint;
std::unique_ptr<zmq::socket_t> m_subscriber;
std::string m_cancelEndpoint;
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
bool m_ended;
bool m_canceled;
};
}
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment