Commit 0134ec44 authored by legoc's avatar legoc
Browse files

(split) Removal of the url member in API classes and its consequences

parent 5b6d5c01
......@@ -146,7 +146,6 @@ public:
* throws StarterServerException.
*/
static Server& getStarterServer();
static const std::string& getUrl();
static bool isAvailable(int timeout = 10000);
static bool isStopping();
......@@ -234,7 +233,6 @@ public:
const std::string& getName() const;
int getId() const;
const std::string& getUrl() const;
const Endpoint& getEndpoint() const;
std::string getNameId() const;
const Com& getCom() const;
......@@ -416,7 +414,7 @@ public:
void cancel();
private:
Subscriber(Server * server, const std::string& url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
Subscriber(Server * server, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
void init();
std::unique_ptr<SubscriberImpl> m_impl;
......
......@@ -56,7 +56,6 @@ public:
int getTimeout() const;
const Endpoint& getEndpoint() const;
const std::string& getUrl() const;
std::array<int, 3> getVersion() const;
bool isAvailable(int timeoutMs) const;
......
......@@ -43,9 +43,8 @@ public:
void setTimeout(int timeout);
int getTimeout() const;
const Endpoint& getEndpoint() const;
const std::string& getUrl() const;
std::array<int, 3> getVersion() const;
const std::string& getStatusEndpoint() const;
Endpoint getStatusEndpoint() const;
bool isAvailable(int timeout) const;
void retrieveServerVersion();
......@@ -58,9 +57,7 @@ public:
Endpoint m_serverEndpoint;
std::array<int, 3> m_serverVersion;
std::string m_url;
int m_statusPort;
std::string m_serverStatusEndpoint;
std::unique_ptr<ServicesImpl> m_impl;
std::unique_ptr<RequestSocketImpl> m_requestSocket;
};
......
......@@ -39,6 +39,8 @@ public:
static Endpoint parse(const std::string& str);
Endpoint withPort(int port) const;
std::string toString() const;
private:
......
......@@ -178,8 +178,6 @@ void This::initApplication(int argc, char *argv[]) {
m_serverEndpoint = Endpoint::parse(infoObject[message::ApplicationIdentity::SERVER].GetString());
m_url = m_serverEndpoint.getProtocol() + "://" + m_serverEndpoint.getAddress();
// Create the request socket. The server endpoint has been defined.
Services::initRequestSocket();
......@@ -307,10 +305,6 @@ Server& This::getStarterServer() {
return *m_instance.m_starterServer;
}
const std::string& This::getUrl() {
return m_instance.Services::getUrl();
}
bool This::isAvailable(int timeout) {
return m_instance.Services::isAvailable(timeout);
}
......@@ -539,10 +533,6 @@ int Instance::getId() const {
return m_id;
}
const std::string& Instance::getUrl() const {
return m_server->getUrl();
}
const Endpoint& Instance::getEndpoint() const {
return m_server->getEndpoint();
}
......@@ -879,8 +869,8 @@ void Publisher::sendEnd() const {
///////////////////////////////////////////////////////////////////////////
// Subscriber
Subscriber::Subscriber(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string & publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_impl(new SubscriberImpl(server, url, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, instanceId, instanceEndpoint, statusEndpoint)) {
Subscriber::Subscriber(Server * server, int publisherPort, int synchronizerPort, const std::string & publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_impl(new SubscriberImpl(server, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, instanceId, instanceEndpoint, statusEndpoint)) {
}
Subscriber::~Subscriber() {
......@@ -1111,7 +1101,7 @@ Requester::~Requester() {
std::unique_ptr<Requester> Requester::create(Instance & instance, const std::string& name) {
int responderId = instance.getId();
string responderUrl = instance.getUrl();
string responderUrl = instance.getEndpoint().getProtocol() + "://" + instance.getEndpoint().getAddress();
string responderEndpoint = instance.getEndpoint().toString();
// Create a request socket to the server of the instance.
......
......@@ -37,7 +37,6 @@ void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
Services::init();
m_serverEndpoint = endpoint;
m_url = endpoint.getProtocol() + "://" + endpoint.getAddress();
// Set the timeout.
Services::setTimeout(timeoutMs);
......@@ -100,10 +99,6 @@ const Endpoint& Server::getEndpoint() const {
return Services::getEndpoint();
}
const std::string& Server::getUrl() const {
return Services::getUrl();
}
std::array<int, 3> Server::getVersion() const {
return Services::getVersion();
}
......@@ -544,7 +539,7 @@ std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const
int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint.toString(), m_serverStatusEndpoint));
unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint.toString(), m_serverEndpoint.withPort(m_statusPort).toString()));
subscriber->init();
return subscriber;
......
......@@ -94,16 +94,12 @@ const Endpoint& Services::getEndpoint() const {
return m_serverEndpoint;
}
const std::string& Services::getUrl() const {
return m_url;
}
std::array<int, 3> Services::getVersion() const {
return m_serverVersion;
}
const std::string& Services::getStatusEndpoint() const {
return m_serverStatusEndpoint;
Endpoint Services::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
bool Services::isAvailable(int timeout) const {
......@@ -142,10 +138,6 @@ void Services::initStatus() {
// Get the status port.
m_statusPort = value;
stringstream ss;
ss << m_url << ":" << m_statusPort;
m_serverStatusEndpoint = ss.str();
}
std::unique_ptr<EventStreamSocket> Services::openEventStream() {
......@@ -162,7 +154,7 @@ std::unique_ptr<EventStreamSocket> Services::openEventStream() {
// Create the sockets.
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint.str());
zmq::socket_t * subscriber = m_impl->createEventSubscriber(m_serverStatusEndpoint, cancelEndpoint.str());
zmq::socket_t * subscriber = m_impl->createEventSubscriber(getStatusEndpoint().toString(), cancelEndpoint.str());
// Wait for the connection to be ready.
m_impl->waitForSubscriber(subscriber, m_requestSocket.get());
......@@ -190,15 +182,12 @@ std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(const std
return nullptr;
}
// Prepare our context and subscriber.
string streamEndpoint = m_url + ":" + to_string(port);
// We define a unique name that depends on the event stream socket object because there can be many (instances).
string cancelEndpoint = "inproc://cancel." + to_string(CancelIdGenerator::newId());
// Create the sockets.
zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);
zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(m_serverEndpoint.withPort(port).toString(), cancelEndpoint);
// Wait for the connection to be ready.
m_impl->waitForStreamSubscriber(subscriber, m_requestSocket.get(), name);
......
......@@ -100,6 +100,10 @@ Endpoint Endpoint::parse(const std::string& str) {
return Endpoint(protocol, address, port);
}
Endpoint Endpoint::withPort(int port) const {
return Endpoint(m_protocol, m_address, port);
}
std::string Endpoint::toString() const {
return m_protocol + "://" + m_address + ":" + to_string(m_port);
}
......
......@@ -123,15 +123,12 @@ bool PublisherImpl::waitForSubscribers() {
void PublisherImpl::cancelWaitForSubscribers() {
stringstream endpoint;
endpoint << m_application->getUrl() << ":" << (m_publisherPort + 1);
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket only for the request.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_application->getEndpoint().withPort(m_publisherPort + 1).toString());
requestSocket->request(request.toString());
}
......
......@@ -91,7 +91,7 @@ void RequesterImpl::sendBinary(const std::string& requestData) {
request.pushInt(m_application->getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(m_application->getUrl());
request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(m_application->getEndpoint().getPort());
......@@ -123,7 +123,7 @@ void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const st
request.pushInt(m_application->getId());
request.pushKey(message::Request::SERVER_URL);
request.pushString(m_application->getUrl());
request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
request.pushKey(message::Request::SERVER_PORT);
request.pushInt(m_application->getEndpoint().getPort());
......@@ -178,15 +178,12 @@ bool RequesterImpl::receive(std::string& data) {
void RequesterImpl::cancel() {
stringstream requesterEndpoint;
requesterEndpoint << m_application->getUrl() << ":" << m_requesterPort;
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket only for the request.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(requesterEndpoint.str());
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_application->getEndpoint().withPort(m_requesterPort).toString());
requestSocket->request(request.toString());
}
......
......@@ -50,15 +50,12 @@ ResponderImpl::~ResponderImpl() {
void ResponderImpl::cancel() {
stringstream endpoint;
endpoint << m_application->getUrl() << ":" << m_responderPort;
json::StringObject request;
request.pushKey(message::TYPE);
request.pushInt(message::CANCEL);
// Create a request socket.
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_application->getEndpoint().withPort(m_responderPort).toString());
requestSocket->request(request.toString());
}
......
......@@ -28,9 +28,8 @@ using namespace std;
namespace cameo {
SubscriberImpl::SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
SubscriberImpl::SubscriberImpl(Server * server, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
m_server(server),
m_url(url),
m_publisherName(publisherName),
m_publisherPort(publisherPort),
m_synchronizerPort(synchronizerPort),
......@@ -56,10 +55,7 @@ void SubscriberImpl::init() {
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, string(message::Event::CANCEL).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, string(message::Event::STATUS).length());
stringstream pubEndpoint;
pubEndpoint << m_url << ":" << m_publisherPort;
m_subscriber->connect(pubEndpoint.str().c_str());
m_subscriber->connect(m_server->getEndpoint().withPort(m_publisherPort).toString());
// We must first bind the cancel publisher before connecting the subscriber.
stringstream cancelEndpoint;
......@@ -77,13 +73,8 @@ void SubscriberImpl::init() {
// Synchronize the subscriber only if the number of subscribers > 0.
if (m_numberOfSubscribers > 0) {
stringstream syncEndpoint;
syncEndpoint << m_url << ":" << m_synchronizerPort;
string endpoint = syncEndpoint.str();
// Create a request socket.
unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(endpoint);
unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(m_server->getEndpoint().withPort(m_synchronizerPort).toString());
// Poll subscriber.
zmq_pollitem_t items[1];
......
......@@ -29,7 +29,7 @@ class Server;
class SubscriberImpl {
public:
SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
SubscriberImpl(Server * server, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
~SubscriberImpl();
void init();
......@@ -44,7 +44,6 @@ public:
WaitingImpl * waiting();
Server * m_server;
std::string m_url;
int m_publisherPort;
int m_synchronizerPort;
std::string m_publisherName;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment