Commit 74a0aa9c authored by legoc's avatar legoc
Browse files

Some renamings

parent 77243586
......@@ -83,12 +83,12 @@ class Subscriber {
public:
~Subscriber();
static std::unique_ptr<Subscriber> create(application::Instance &instance, const std::string &publisherName);
static std::unique_ptr<Subscriber> create(application::Instance & app, const std::string &publisherName);
const std::string& getPublisherName() const;
const std::string& getInstanceName() const;
int getInstanceId() const;
Endpoint getInstanceEndpoint() const;
const std::string& getAppName() const;
int getAppId() const;
Endpoint getAppEndpoint() const;
/**
* Deprecated.
......@@ -119,13 +119,13 @@ public:
private:
Subscriber();
void init(application::Instance &instance, const std::string &publisherName, const std::string &instanceName);
static std::unique_ptr<Subscriber> createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName);
static std::unique_ptr<Subscriber> createSubscriber(application::Instance & app, const std::string &publisherName, const std::string &instanceName);
static std::string createConnectPublisherRequest(int id, const std::string& publisherName);
std::string m_publisherName;
std::string m_instanceName;
int m_instanceId;
Endpoint m_instanceEndpoint;
std::string m_appName;
int m_appId;
Endpoint m_appEndpoint;
std::unique_ptr<SubscriberImpl> m_impl;
std::unique_ptr<Waiting> m_waiting;
};
......
......@@ -132,15 +132,15 @@ Subscriber::Subscriber() {
Subscriber::~Subscriber() {
}
void Subscriber::init(application::Instance & instance, const std::string& publisherName, const std::string& instanceName) {
void Subscriber::init(application::Instance & app, const std::string& publisherName, const std::string& instanceName) {
m_publisherName = publisherName;
m_instanceName = instance.getName();
m_instanceId = instance.getId();
m_instanceEndpoint = instance.getEndpoint();
m_appName = app.getName();
m_appId = app.getId();
m_appEndpoint = app.getEndpoint();
// Get the JSON response.
json::Object response = instance.getCom().requestJSON(createConnectPublisherRequest(instance.getId(), publisherName));
json::Object response = app.getCom().requestJSON(createConnectPublisherRequest(app.getId(), publisherName));
int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
if (publisherPort == -1) {
......@@ -150,27 +150,27 @@ void Subscriber::init(application::Instance & instance, const std::string& publi
int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
m_impl->init(m_instanceId, m_instanceEndpoint, instance.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
m_impl->init(m_appId, m_appEndpoint, app.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
}
std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance &instance, const std::string &publisherName, const std::string &instanceName) {
std::unique_ptr<Subscriber> Subscriber::createSubscriber(application::Instance & app, const std::string &publisherName, const std::string &instanceName) {
std::unique_ptr<Subscriber> subscriber = std::unique_ptr<Subscriber>(new Subscriber());
subscriber->init(instance, publisherName, instanceName);
subscriber->init(app, publisherName, instanceName);
return subscriber;
}
std::unique_ptr<Subscriber> Subscriber::create(application::Instance & instance, const std::string& publisherName) {
std::unique_ptr<Subscriber> Subscriber::create(application::Instance & app, const std::string& publisherName) {
try {
return createSubscriber(instance, publisherName, instance.getName());
return createSubscriber(app, publisherName, app.getName());
} catch (const SubscriberCreationException& e) {
// the publisher does not exist, so we are waiting for it
}
// waiting for the publisher
application::State lastState = instance.waitFor(publisherName);
application::State lastState = app.waitFor(publisherName);
// state cannot be terminal or it means that the application has terminated that is not planned.
if (lastState == application::SUCCESS
......@@ -181,7 +181,7 @@ std::unique_ptr<Subscriber> Subscriber::create(application::Instance & instance,
}
try {
return createSubscriber(instance, publisherName, instance.getName());
return createSubscriber(app, publisherName, app.getName());
} catch (const SubscriberCreationException& e) {
// that should not happen
......@@ -194,16 +194,16 @@ const std::string& Subscriber::getPublisherName() const {
return m_publisherName;
}
const std::string& Subscriber::getInstanceName() const {
return m_instanceName;
const std::string& Subscriber::getAppName() const {
return m_appName;
}
int Subscriber::getInstanceId() const {
return m_instanceId;
int Subscriber::getAppId() const {
return m_appId;
}
Endpoint Subscriber::getInstanceEndpoint() const {
return m_instanceEndpoint;
Endpoint Subscriber::getAppEndpoint() const {
return m_appEndpoint;
}
bool Subscriber::hasEnded() const {
......@@ -262,9 +262,9 @@ std::ostream& operator<<(std::ostream& os, const cameo::coms::Publisher& publish
std::ostream& operator<<(std::ostream& os, const cameo::coms::Subscriber& subscriber) {
os << "sub." << subscriber.getPublisherName()
<< ":" << subscriber.getInstanceName()
<< "." << subscriber.getInstanceId()
<< "@" << subscriber.getInstanceEndpoint();
<< ":" << subscriber.getAppName()
<< "." << subscriber.getAppId()
<< "@" << subscriber.getAppEndpoint();
return os;
}
......
......@@ -29,7 +29,7 @@ class SubscriberImpl {
public:
virtual ~SubscriberImpl() {}
virtual void init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) = 0;
virtual void init(int appId, const Endpoint& appEndpoint, const Endpoint& appStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) = 0;
virtual bool isEnded() const = 0;
virtual bool isCanceled() const = 0;
......
......@@ -32,7 +32,7 @@ SubscriberZmq::SubscriberZmq() :
m_publisherPort(0),
m_synchronizerPort(0),
m_numberOfSubscribers(0),
m_instanceId(0),
m_appId(0),
m_ended(false),
m_canceled(false) {
}
......@@ -40,12 +40,12 @@ SubscriberZmq::SubscriberZmq() :
SubscriberZmq::~SubscriberZmq() {
}
void SubscriberZmq::init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
void SubscriberZmq::init(int appId, const Endpoint& appEndpoint, const Endpoint& appStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
m_publisherPort = publisherPort;
m_synchronizerPort = synchronizerPort;
m_numberOfSubscribers = numberOfSubscribers;
m_instanceId = instanceId;
m_appId = appId;
m_ended = false;
m_canceled = false;
......@@ -58,7 +58,7 @@ void SubscriberZmq::init(int instanceId, const Endpoint& instanceEndpoint, const
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, std::string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, std::string(message::Event::STATUS).length());
m_subscriber->connect(instanceEndpoint.withPort(m_publisherPort).toString());
m_subscriber->connect(appEndpoint.withPort(m_publisherPort).toString());
// We must first bind the cancel publisher before connecting the subscriber.
std::stringstream cancelEndpoint;
......@@ -71,13 +71,13 @@ void SubscriberZmq::init(int instanceId, const Endpoint& instanceEndpoint, const
m_cancelPublisher->bind(m_cancelEndpoint.c_str());
m_subscriber->connect(m_cancelEndpoint.c_str());
m_subscriber->connect(instanceStatusEndpoint.toString().c_str());
m_subscriber->connect(appStatusEndpoint.toString().c_str());
// Synchronize the subscriber only if the number of subscribers > 0.
if (m_numberOfSubscribers > 0) {
// Create a request socket.
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(instanceEndpoint.withPort(m_synchronizerPort).toString());
std::unique_ptr<RequestSocket> requestSocket = application::This::getCom().createRequestSocket(appEndpoint.withPort(m_synchronizerPort).toString());
// Poll subscriber.
zmq_pollitem_t items[1];
......@@ -146,7 +146,7 @@ std::optional<std::string> SubscriberZmq::receiveBinary() {
int id = status[message::StatusEvent::ID].GetInt();
if (id == m_instanceId) {
if (id == m_appId) {
application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
// test the terminal state
......@@ -207,7 +207,7 @@ std::optional<std::tuple<std::string, std::string>> SubscriberZmq::receiveTwoBin
int id = status[message::StatusEvent::ID].GetInt();
if (id == m_instanceId) {
if (id == m_appId) {
application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
// test the terminal state
......
......@@ -32,7 +32,7 @@ public:
SubscriberZmq();
virtual ~SubscriberZmq();
virtual void init(int instanceId, const Endpoint& instanceEndpoint, const Endpoint& instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers);
virtual void init(int appId, const Endpoint& appEndpoint, const Endpoint& appStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers);
virtual bool isEnded() const;
virtual bool isCanceled() const;
......@@ -49,7 +49,7 @@ private:
int m_publisherPort;
int m_synchronizerPort;
int m_numberOfSubscribers;
int m_instanceId;
int m_appId;
std::unique_ptr<zmq::socket_t> m_subscriber;
std::string m_cancelEndpoint;
std::unique_ptr<zmq::socket_t> m_cancelPublisher;
......
......@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.12.0)
# Project name and version
project(cameopy
VERSION 1.1.0
VERSION 1.2.0
LANGUAGES CXX
HOMEPAGE_URL "https://code.ill.fr/cameo/cameo"
)
......
......@@ -157,9 +157,9 @@ PYBIND11_MODULE(cameopy, m) {
"publisherName"_a,
py::call_guard<py::gil_scoped_release>())
.def("getPublisherName", &Subscriber::getPublisherName)
.def("getInstanceName", &Subscriber::getInstanceName)
.def("getInstanceId", &Subscriber::getInstanceId)
.def("getInstanceEndpoint", &Subscriber::getInstanceEndpoint)
.def("getAppName", &Subscriber::getAppName)
.def("getAppId", &Subscriber::getAppId)
.def("getAppEndpoint", &Subscriber::getAppEndpoint)
.def("isEnded", &Subscriber::isEnded)
.def("isCanceled", &Subscriber::isCanceled)
.def("receiveBinary", &Subscriber::receiveBinary, py::call_guard<py::gil_scoped_release>())
......
......@@ -17,9 +17,9 @@ import fr.ill.ics.cameo.strings.Endpoint;
public class Subscriber {
private String publisherName;
private String instanceName;
private int instanceId;
private Endpoint instanceEndpoint;
private String appName;
private int appId;
private Endpoint appEndpoint;
private SubscriberImpl impl;
private SubscriberWaiting waiting = new SubscriberWaiting(this);
......@@ -30,15 +30,15 @@ public class Subscriber {
waiting.add();
}
private void initSubscriber(Instance instance, String publisherName) throws SubscriberCreationException {
private void initSubscriber(Instance app, String publisherName) throws SubscriberCreationException {
this.publisherName = publisherName;
this.instanceName = instance.getName();
this.instanceId = instance.getId();
this.instanceEndpoint = instance.getEndpoint();
this.appName = app.getName();
this.appId = app.getId();
this.appEndpoint = app.getEndpoint();
JSONObject request = Messages.createConnectPublisherRequest(instance.getId(), publisherName);
JSONObject response = instance.getCom().requestJSON(request);
JSONObject request = Messages.createConnectPublisherRequest(app.getId(), publisherName);
JSONObject response = app.getCom().requestJSON(request);
int publisherPort = JSON.getInt(response, Messages.PublisherResponse.PUBLISHER_PORT);
......@@ -49,13 +49,13 @@ public class Subscriber {
int synchronizerPort = JSON.getInt(response, Messages.PublisherResponse.SYNCHRONIZER_PORT);
int numberOfSubscribers = JSON.getInt(response, Messages.PublisherResponse.NUMBER_OF_SUBSCRIBERS);
impl.init(instanceId, instanceEndpoint, instance.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
impl.init(appId, appEndpoint, app.getStatusEndpoint(), publisherPort, synchronizerPort, numberOfSubscribers);
}
private boolean init(Instance application, String publisherName) {
private boolean init(Instance app, String publisherName) {
try {
initSubscriber(application, publisherName);
initSubscriber(app, publisherName);
return true;
}
catch (SubscriberCreationException e) {
......@@ -63,7 +63,7 @@ public class Subscriber {
}
// waiting for the publisher
int lastState = application.waitFor(publisherName);
int lastState = app.waitFor(publisherName);
// state cannot be terminal or it means that the application has terminated that is not planned.
if (lastState == Application.State.SUCCESS
......@@ -74,7 +74,7 @@ public class Subscriber {
}
try {
initSubscriber(application, publisherName);
initSubscriber(app, publisherName);
return true;
}
catch (SubscriberCreationException e) {
......@@ -90,10 +90,10 @@ public class Subscriber {
* @param publisherName
* @return
*/
public static Subscriber create(Instance application, String publisherName) {
public static Subscriber create(Instance app, String publisherName) {
Subscriber subscriber = new Subscriber();
subscriber.init(application, publisherName);
subscriber.init(app, publisherName);
return subscriber;
}
......@@ -102,16 +102,16 @@ public class Subscriber {
return publisherName;
}
public String getInstanceName() {
return instanceName;
public String getAppName() {
return appName;
}
public int getInstanceId() {
return instanceId;
public int getAppId() {
return appId;
}
public Endpoint getInstanceEndpoint() {
return instanceEndpoint;
public Endpoint getAppEndpoint() {
return appEndpoint;
}
public boolean isEnded() {
......@@ -157,6 +157,6 @@ public class Subscriber {
@Override
public String toString() {
return "sub." + getPublisherName() + ":" + getInstanceName() + "." + getInstanceId() + "@" + getInstanceEndpoint();
return "sub." + getPublisherName() + ":" + getAppName() + "." + getAppId() + "@" + getAppEndpoint();
}
}
\ No newline at end of file
......@@ -22,7 +22,7 @@ import fr.ill.ics.cameo.strings.Endpoint;
public interface SubscriberImpl {
void init(int instanceId, Endpoint instanceEndpoint, Endpoint instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) throws ConnectionTimeout;
void init(int appId, Endpoint appEndpoint, Endpoint appStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) throws ConnectionTimeout;
boolean isEnded();
boolean isCanceled();
byte[] receive();
......
......@@ -37,19 +37,19 @@ public class SubscriberZmq implements SubscriberImpl {
private Zmq.Socket subscriber;
private String cancelEndpoint;
private Zmq.Socket cancelPublisher;
private int instanceId;
private int appId;
private boolean ended = false;
private boolean canceled = false;
public void init(int instanceId, Endpoint instanceEndpoint, Endpoint instanceStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
public void init(int appId, Endpoint appEndpoint, Endpoint appStatusEndpoint, int publisherPort, int synchronizerPort, int numberOfSubscribers) {
this.instanceId = instanceId;
this.appId = appId;
this.context = ((ContextZmq)This.getCom().getContext()).getContext();
// Create the subscriber
subscriber = context.createSocket(Zmq.SUB);
subscriber.connect(instanceEndpoint.withPort(publisherPort).toString());
subscriber.connect(appEndpoint.withPort(publisherPort).toString());
subscriber.subscribe(Messages.Event.SYNC);
subscriber.subscribe(Messages.Event.STREAM);
subscriber.subscribe(Messages.Event.ENDSTREAM);
......@@ -66,14 +66,14 @@ public class SubscriberZmq implements SubscriberImpl {
subscriber.subscribe(Messages.Event.CANCEL);
// Subscribe to STATUS
subscriber.connect(instanceStatusEndpoint.toString());
subscriber.connect(appStatusEndpoint.toString());
subscriber.subscribe(Messages.Event.STATUS);
// Synchronize the subscriber only if the number of subscribers > 0
if (numberOfSubscribers > 0) {
// Create a socket that will be used for several requests.
RequestSocket requestSocket = This.getCom().createRequestSocket(instanceEndpoint.withPort(synchronizerPort).toString());
RequestSocket requestSocket = This.getCom().createRequestSocket(appEndpoint.withPort(synchronizerPort).toString());
// polling to wait for connection
Zmq.Poller poller = context.createPoller(subscriber);
......@@ -140,7 +140,7 @@ public class SubscriberZmq implements SubscriberImpl {
// Get the id.
int id = JSON.getInt(status, Messages.StatusEvent.ID);
if (instanceId == id) {
if (appId == id) {
// Get the state.
int state = JSON.getInt(status, Messages.StatusEvent.APPLICATION_STATE);
......@@ -191,7 +191,7 @@ public class SubscriberZmq implements SubscriberImpl {
// Get the id.
int id = JSON.getInt(request, Messages.StatusEvent.ID);
if (instanceId == id) {
if (appId == id) {
// Get the state.
int state = JSON.getInt(request, Messages.StatusEvent.APPLICATION_STATE);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment