Commit 872e8c39 authored by legoc's avatar legoc
Browse files

Reviewed C++ stream socket init

parent c8a9dea4
......@@ -17,12 +17,14 @@
#ifndef CAMEO_EVENTSTREAMSOCKET_H_
#define CAMEO_EVENTSTREAMSOCKET_H_
#include <memory>
#include "Event.h"
#include "Strings.h"
#include <memory>
namespace cameo {
class Server;
class Context;
class RequestSocket;
class StreamSocketImpl;
namespace application {
......@@ -43,7 +45,8 @@ public:
void cancel();
private:
EventStreamSocket(Server * server);
EventStreamSocket();
void init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket);
std::unique_ptr<StreamSocketImpl> m_impl;
};
......
......@@ -18,12 +18,14 @@
#define CAMEO_OUTPUTSTREAMSOCKET_H_
#include "Event.h"
#include "Strings.h"
#include <memory>
#include <optional>
namespace cameo {
class Server;
class Context;
class RequestSocket;
class StreamSocketImpl;
namespace application {
......@@ -66,7 +68,8 @@ public:
bool isCanceled() const;
private:
OutputStreamSocket(Server * server, const std::string& name);
OutputStreamSocket(const std::string& name);
void init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket);
int m_applicationId;
bool m_ended;
......
......@@ -44,6 +44,8 @@ class Server {
friend class application::Instance;
friend class application::This;
friend class EventStreamSocket;
friend class OutputStreamSocket;
friend std::ostream& operator<<(std::ostream&, const Server&);
public:
......@@ -56,7 +58,8 @@ public:
void setTimeout(int value);
int getTimeout() const;
const Endpoint& getEndpoint() const;
Endpoint getEndpoint() const;
Endpoint getStatusEndpoint() const;
std::array<int, 3> getVersion() const;
bool isAvailable(int timeout) const;
......@@ -131,17 +134,6 @@ public:
*/
void unregisterEventListener(EventListener * listener);
//TODO private?
Context * getContext();
//TODO private?
Endpoint getStatusEndpoint() const;
//TODO private?
void sendSync();
//TODO private?
void sendSyncStream(const std::string& name);
//TODO private?
int getStreamPort(const std::string& name);
private:
void initServer(const Endpoint& endpoint, int timeoutMs);
std::unique_ptr<application::Instance> makeInstance();
......@@ -166,6 +158,7 @@ private:
void retrieveServerVersion();
void initStatus();
int getStreamPort(const std::string& name);
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name);
std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint, int timeout);
......
......@@ -27,15 +27,18 @@
namespace cameo {
EventStreamSocket::EventStreamSocket(Server * server) {
EventStreamSocket::EventStreamSocket() {
//TODO Replace with a factory.
m_impl = std::unique_ptr<StreamSocketImpl>(new EventStreamSocketZmq(server));
m_impl->init();
m_impl = std::unique_ptr<StreamSocketImpl>(new EventStreamSocketZmq());
}
EventStreamSocket::~EventStreamSocket() {
}
void EventStreamSocket::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
m_impl->init(context, endpoint, requestSocket);
}
std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
std::string message(m_impl->receive(blocking));
......
......@@ -39,14 +39,17 @@ bool Output::isEndOfLine() const {
return m_endOfLine;
}
OutputStreamSocket::OutputStreamSocket(Server * server, const std::string& name) :
OutputStreamSocket::OutputStreamSocket(const std::string& name) :
m_applicationId(-1),
m_ended(false),
m_canceled(false) {
//TODO Replace with factory.
m_impl = std::unique_ptr<StreamSocketImpl>(new OutputStreamSocketZmq(server, name));
m_impl->init();
m_impl = std::unique_ptr<StreamSocketImpl>(new OutputStreamSocketZmq(name));
}
void OutputStreamSocket::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
m_impl->init(context, endpoint, requestSocket);
}
OutputStreamSocket::~OutputStreamSocket() {
......
......@@ -124,10 +124,14 @@ int Server::getTimeout() const {
return m_timeout;
}
const Endpoint& Server::getEndpoint() const {
Endpoint Server::getEndpoint() const {
return m_serverEndpoint;
}
Endpoint Server::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
std::array<int, 3> Server::getVersion() const {
return m_serverVersion;
}
......@@ -525,7 +529,10 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
}
// Create the event stream socket.
return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(this));
std::unique_ptr<EventStreamSocket> eventStreamSocket = std::unique_ptr<EventStreamSocket>(new EventStreamSocket());
eventStreamSocket->init(m_contextImpl.get(), getStatusEndpoint(), m_requestSocket.get());
return eventStreamSocket;
}
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
......@@ -644,14 +651,6 @@ void Server::initRequestSocket() {
m_requestSocket = std::move(createRequestSocket(m_serverEndpoint.toString(), m_timeout));
}
Context * Server::getContext() {
return m_contextImpl.get();
}
Endpoint Server::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
void Server::retrieveServerVersion() {
json::Object response = m_requestSocket->requestJSON(createVersionRequest());
......@@ -685,8 +684,18 @@ int Server::getStreamPort(const std::string& name) {
}
std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::string& name) {
// Create the event stream socket.
return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(this, name));
// Create the output stream socket.
std::unique_ptr<OutputStreamSocket> outputStreamSocket = std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(name));
int port = getStreamPort(name);
if (port == -1) {
std::cerr << "No stream port for " << name << std::endl;
return nullptr;
}
outputStreamSocket->init(m_contextImpl.get(), m_serverEndpoint.withPort(port), m_requestSocket.get());
return outputStreamSocket;
}
std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint) {
......@@ -697,26 +706,6 @@ std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& en
return std::unique_ptr<RequestSocket>(new RequestSocket(m_contextImpl.get(), endpoint, timeout));
}
void Server::sendSync() {
try {
m_requestSocket->requestJSON(createSyncRequest());
}
catch (const ConnectionTimeout& e) {
// The server is not accessible.
}
}
void Server::sendSyncStream(const std::string& name) {
try {
m_requestSocket->requestJSON(createSyncStreamRequest(name));
}
catch (const ConnectionTimeout&) {
// The server is not accessible.
}
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
os << "server@" << server.m_serverEndpoint.toString();
......
......@@ -17,16 +17,20 @@
#ifndef CAMEO_EVENTSTREAMSOCKETIMPL_H_
#define CAMEO_EVENTSTREAMSOCKETIMPL_H_
#include "Strings.h"
#include "../RequestSocket.h"
#include <string>
namespace cameo {
class Context;
class StreamSocketImpl {
public:
virtual ~StreamSocketImpl() {}
virtual void init() = 0;
virtual void init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) = 0;
virtual void send(const std::string& data) = 0;
virtual std::string receive(bool blocking = true) = 0;
virtual void cancel() = 0;
......
......@@ -24,16 +24,17 @@ using namespace std;
namespace cameo {
EventStreamSocketZmq::EventStreamSocketZmq(Server * server) :
m_server(server) {
m_context = dynamic_cast<ContextZmq *>(server->getContext());
EventStreamSocketZmq::EventStreamSocketZmq() : m_context(nullptr) {
}
EventStreamSocketZmq::~EventStreamSocketZmq() {
close();
}
void EventStreamSocketZmq::init() {
void EventStreamSocketZmq::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
m_context = dynamic_cast<ContextZmq *>(context);
std::stringstream cancelEndpoint;
......@@ -58,7 +59,7 @@ void EventStreamSocketZmq::init() {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
m_socket->connect(m_server->getStatusEndpoint().toString().c_str());
m_socket->connect(endpoint.toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
......@@ -71,7 +72,12 @@ void EventStreamSocketZmq::init() {
items[0].revents = 0;
while (true) {
m_server->sendSync();
try {
requestSocket->requestJSON(createSyncRequest());
}
catch (const ConnectionTimeout& e) {
// The server is not accessible.
}
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
......
......@@ -24,23 +24,22 @@
namespace cameo {
class Server;
class ContextZmq;
class EventStreamSocketZmq : public StreamSocketImpl {
public:
EventStreamSocketZmq(Server * server);
EventStreamSocketZmq();
virtual ~EventStreamSocketZmq();
void init();
void init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket);
void send(const std::string& data);
std::string receive(bool blocking);
void cancel();
void close();
private:
Server * m_server;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
......
......@@ -24,23 +24,18 @@ using namespace std;
namespace cameo {
OutputStreamSocketZmq::OutputStreamSocketZmq(Server * server, const std::string& name) :
m_server(server),
m_name(name) {
m_context = dynamic_cast<ContextZmq *>(server->getContext());
OutputStreamSocketZmq::OutputStreamSocketZmq(const std::string& name) :
m_name(name),
m_context(nullptr) {
}
OutputStreamSocketZmq::~OutputStreamSocketZmq() {
close();
}
void OutputStreamSocketZmq::init() {
void OutputStreamSocketZmq::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
int port = m_server->getStreamPort(m_name);
if (port == -1) {
std::cerr << "No stream port for " << m_name << std::endl;
return;
}
m_context = dynamic_cast<ContextZmq *>(context);
std::stringstream cancelEndpoint;
......@@ -63,7 +58,7 @@ void OutputStreamSocketZmq::init() {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
m_socket->connect(m_server->getEndpoint().withPort(port).toString().c_str());
m_socket->connect(endpoint.toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
// Wait for the connection to be ready.
......@@ -75,7 +70,12 @@ void OutputStreamSocketZmq::init() {
items[0].revents = 0;
while (true) {
m_server->sendSyncStream(m_name);
try {
requestSocket->requestJSON(createSyncStreamRequest(m_name));
}
catch (const ConnectionTimeout&) {
// The server is not accessible.
}
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
......
......@@ -30,17 +30,16 @@ class ContextZmq;
class OutputStreamSocketZmq : public StreamSocketImpl {
public:
OutputStreamSocketZmq(Server * server, const std::string& name);
OutputStreamSocketZmq(const std::string& name);
virtual ~OutputStreamSocketZmq();
void init();
void init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket);
void send(const std::string& data);
std::string receive(bool blocking);
void cancel();
void close();
private:
Server * m_server;
std::string m_name;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment