Commit 77243586 authored by legoc's avatar legoc
Browse files

Reviewed Subscriber interface

parent 95015c1a
......@@ -122,6 +122,10 @@ private:
static std::unique_ptr<Subscriber> createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName);
static std::string createConnectPublisherRequest(int id, const std::string& publisherName);
std::string m_publisherName;
std::string m_instanceName;
int m_instanceId;
Endpoint m_instanceEndpoint;
std::unique_ptr<SubscriberImpl> m_impl;
std::unique_ptr<Waiting> m_waiting;
};
......
......@@ -134,6 +134,11 @@ Subscriber::~Subscriber() {
void Subscriber::init(application::Instance & instance, const std::string& publisherName, const std::string& instanceName) {
m_publisherName = publisherName;
m_instanceName = instance.getName();
m_instanceId = instance.getId();
m_instanceEndpoint = instance.getEndpoint();
// Get the JSON response.
json::Object response = instance.getCom().requestJSON(createConnectPublisherRequest(instance.getId(), publisherName));
......@@ -145,8 +150,7 @@ void Subscriber::init(application::Instance & instance, const std::string& publi
int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
// TODO simplify the use of some variables: e.g. m_serverEndpoint accessible from this.
m_impl->init(publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instance);
m_impl->init(m_instanceId, m_instanceEndpoint, instance.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
}
std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName) {
......@@ -187,19 +191,19 @@ std::unique_ptr<Subscriber> Subscriber::create(application::Instance & instance,
}
const std::string& Subscriber::getPublisherName() const {
return m_impl->getPublisherName();
return m_publisherName;
}
const std::string& Subscriber::getInstanceName() const {
return m_impl->getInstanceName();
return m_instanceName;
}
int Subscriber::getInstanceId() const {
return m_impl->getInstanceId();
return m_instanceId;
}
Endpoint Subscriber::getInstanceEndpoint() const {
return m_impl->getInstanceEndpoint();
return m_instanceEndpoint;
}
bool Subscriber::hasEnded() const {
......
......@@ -29,12 +29,7 @@ class SubscriberImpl {
public:
virtual ~SubscriberImpl() {}
virtual void init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance) = 0;
virtual const std::string& getPublisherName() const = 0;
virtual const std::string& getInstanceName() const = 0;
virtual int getInstanceId() const = 0;
virtual Endpoint getInstanceEndpoint() const = 0;
virtual void init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) = 0;
virtual bool isEnded() const = 0;
virtual bool isCanceled() const = 0;
......
......@@ -40,16 +40,12 @@ SubscriberZmq::SubscriberZmq() :
SubscriberZmq::~SubscriberZmq() {
}
void SubscriberZmq::init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance) {
void SubscriberZmq::init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
m_serverEndpoint = instance.getEndpoint();
m_publisherName = publisherName;
m_publisherPort = publisherPort;
m_synchronizerPort = synchronizerPort;
m_numberOfSubscribers = numberOfSubscribers;
m_instanceName = instance.getName();
m_instanceId = instance.getId();
m_statusEndpoint = instance.getStatusEndpoint().toString();
m_instanceId = instanceId;
m_ended = false;
m_canceled = false;
......@@ -62,7 +58,7 @@ void SubscriberZmq::init(int publisherPort, int synchronizerPort, const std::str
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, std::string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, std::string(message::Event::STATUS).length());
m_subscriber->connect(m_serverEndpoint.withPort(m_publisherPort).toString());
m_subscriber->connect(instanceEndpoint.withPort(m_publisherPort).toString());
// We must first bind the cancel publisher before connecting the subscriber.
std::stringstream cancelEndpoint;
......@@ -75,13 +71,13 @@ void SubscriberZmq::init(int publisherPort, int synchronizerPort, const std::str
m_cancelPublisher->bind(m_cancelEndpoint.c_str());
m_subscriber->connect(m_cancelEndpoint.c_str());
m_subscriber->connect(m_statusEndpoint.c_str());
m_subscriber->connect(instanceStatusEndpoint.toString().c_str());
// Synchronize the subscriber only if the number of subscribers > 0.
if (m_numberOfSubscribers > 0) {
// Create a request socket.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(m_serverEndpoint.withPort(m_synchronizerPort).toString());
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(instanceEndpoint.withPort(m_synchronizerPort).toString());
// Poll subscriber.
zmq_pollitem_t items[1];
......@@ -111,22 +107,6 @@ void SubscriberZmq::init(int publisherPort, int synchronizerPort, const std::str
}
}
const std::string& SubscriberZmq::getPublisherName() const {
return m_publisherName;
}
const std::string& SubscriberZmq::getInstanceName() const {
return m_instanceName;
}
int SubscriberZmq::getInstanceId() const {
return m_instanceId;
}
Endpoint SubscriberZmq::getInstanceEndpoint() const {
return m_serverEndpoint;
}
bool SubscriberZmq::isEnded() const {
return m_ended;
}
......
......@@ -32,12 +32,7 @@ public:
SubscriberZmq();
virtual ~SubscriberZmq();
virtual void init(int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, application::Instance & instance);
virtual const std::string& getPublisherName() const;
virtual const std::string& getInstanceName() const;
virtual int getInstanceId() const;
virtual Endpoint getInstanceEndpoint() const;
virtual void init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers);
virtual bool isEnded() const;
virtual bool isCanceled() const;
......@@ -51,14 +46,10 @@ public:
private:
std::string createSubscribePublisherRequest() const;
Endpoint m_serverEndpoint;
int m_publisherPort;
int m_synchronizerPort;
std::string m_publisherName;
int m_numberOfSubscribers;
std::string m_instanceName;
int m_instanceId;
std::string m_statusEndpoint;
std::unique_ptr<zmq::socket_t> m_subscriber;
std::string m_cancelEndpoint;
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
......
......@@ -16,6 +16,10 @@ import fr.ill.ics.cameo.strings.Endpoint;
*/
public class Subscriber {
private String publisherName;
private String instanceName;
private int instanceId;
private Endpoint instanceEndpoint;
private SubscriberImpl impl;
private SubscriberWaiting waiting = new SubscriberWaiting(this);
......@@ -28,6 +32,11 @@ public class Subscriber {
private void initSubscriber(Instance instance, String publisherName) throws SubscriberCreationException {
this.publisherName = publisherName;
this.instanceName = instance.getName();
this.instanceId = instance.getId();
this.instanceEndpoint = instance.getEndpoint();
JSONObject request = Messages.createConnectPublisherRequest(instance.getId(), publisherName);
JSONObject response = instance.getCom().requestJSON(request);
......@@ -40,7 +49,7 @@ public class Subscriber {
int synchronizerPort = JSON.getInt(response, Messages.PublisherResponse.SYNCHRONIZER_PORT);
int numberOfSubscribers = JSON.getInt(response, Messages.PublisherResponse.NUMBER_OF_SUBSCRIBERS);
impl.init(instance, publisherPort, synchronizerPort, publisherName, numberOfSubscribers);
impl.init(instanceId, instanceEndpoint, instance.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
}
private boolean init(Instance application, String publisherName) {
......@@ -90,19 +99,19 @@ public class Subscriber {
}
public String getPublisherName() {
return impl.getPublisherName();
return publisherName;
}
public String getInstanceName() {
return impl.getInstanceName();
return instanceName;
}
public int getInstanceId() {
return impl.getInstanceId();
return instanceId;
}
public Endpoint getInstanceEndpoint() {
return impl.getInstanceEndpoint();
return instanceEndpoint;
}
public boolean isEnded() {
......
......@@ -22,11 +22,7 @@ import fr.ill.ics.cameo.strings.Endpoint;
public interface SubscriberImpl {
void init(Instance instance, int publisherPort, int synchronizerPort, String publisherName, int numberOfSubscribers) throws ConnectionTimeout;
String getPublisherName();
String getInstanceName();
int getInstanceId();
Endpoint getInstanceEndpoint();
void init(int instanceId, Endpoint instanceEndpoint, Endpoint instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) throws ConnectionTimeout;
boolean isEnded();
boolean isCanceled();
byte[] receive();
......
......@@ -37,21 +37,19 @@ public class SubscriberZmq implements SubscriberImpl {
private Zmq.Socket subscriber;
private String cancelEndpoint;
private Zmq.Socket cancelPublisher;
private String publisherName;
private Instance instance;
private int instanceId;
private boolean ended = false;
private boolean canceled = false;
public void init(Instance instance, int publisherPort, int synchronizerPort, String publisherName, int numberOfSubscribers) {
public void init(int instanceId, Endpoint instanceEndpoint, Endpoint instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
this.instance = instance;
this.publisherName = publisherName;
this.instanceId = instanceId;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// Create the subscriber
subscriber = context.createSocket(Zmq.SUB);
subscriber.connect(instance.getEndpoint().withPort(publisherPort).toString());
subscriber.connect(instanceEndpoint.withPort(publisherPort).toString());
subscriber.subscribe(Messages.Event.SYNC);
subscriber.subscribe(Messages.Event.STREAM);
subscriber.subscribe(Messages.Event.ENDSTREAM);
......@@ -68,14 +66,14 @@ public class SubscriberZmq implements SubscriberImpl {
subscriber.subscribe(Messages.Event.CANCEL);
// Subscribe to STATUS
subscriber.connect(instance.getStatusEndpoint().toString());
subscriber.connect(instanceStatusEndpoint.toString());
subscriber.subscribe(Messages.Event.STATUS);
// Synchronize the subscriber only if the number of subscribers > 0
if (numberOfSubscribers > 0) {
// Create a socket that will be used for several requests.
RequestSocket requestSocket = This.getCom().createRequestSocket(instance.getEndpoint().withPort(synchronizerPort).toString());
RequestSocket requestSocket = This.getCom().createRequestSocket(instanceEndpoint.withPort(synchronizerPort).toString());
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
......@@ -105,22 +103,6 @@ public class SubscriberZmq implements SubscriberImpl {
}
}
public String getPublisherName() {
return publisherName;
}
public String getInstanceName() {
return instance.getName();
}
public int getInstanceId() {
return instance.getId();
}
public Endpoint getInstanceEndpoint() {
return instance.getEndpoint();
}
public boolean isEnded() {
return ended;
}
......@@ -158,7 +140,7 @@ public class SubscriberZmq implements SubscriberImpl {
// Get the id.
int id = JSON.getInt(status, Messages.StatusEvent.ID);
if (instance.getId() == id) {
if (instanceId == id) {
// Get the state.
int state = JSON.getInt(status, Messages.StatusEvent.APPLICATION_STATE);
......@@ -209,7 +191,7 @@ public class SubscriberZmq implements SubscriberImpl {
// Get the id.
int id = JSON.getInt(request, Messages.StatusEvent.ID);
if (instance.getId() == id) {
if (instanceId == id) {
// Get the state.
int state = JSON.getInt(request, Messages.StatusEvent.APPLICATION_STATE);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment