Commit 3ada2df5 authored by legoc's avatar legoc
Browse files

Reviewing EventStreamSocket

parent ca6009fd
......@@ -22,8 +22,9 @@
namespace cameo {
class StreamSocketImpl;
class Server;
class WaitingImpl;
class EventStreamSocketImpl;
namespace application {
......@@ -43,11 +44,11 @@ public:
void cancel();
private:
EventStreamSocket(StreamSocketImpl * impl);
EventStreamSocket(Server * server);
WaitingImpl * waiting();
std::unique_ptr<StreamSocketImpl> m_impl;
std::unique_ptr<EventStreamSocketImpl> m_impl;
};
}
......
......@@ -131,6 +131,13 @@ public:
*/
void unregisterEventListener(EventListener * listener);
Context * getContext();
Endpoint getStatusEndpoint() const;
void sendSync();
private:
void initServer(const Endpoint& endpoint, int timeoutMs);
std::unique_ptr<application::Instance> makeInstance();
......@@ -153,8 +160,6 @@ private:
void initContext();
void initRequestSocket();
Endpoint getStatusEndpoint() const;
void retrieveServerVersion();
void initStatus();
int getStreamPort(const std::string& name);
......
......@@ -22,13 +22,15 @@
#include "ResultEvent.h"
#include "StatusEvent.h"
#include "impl/SocketWaitingImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/EventStreamSocketImpl.h"
#include "JSON.h"
#include "Messages.h"
namespace cameo {
EventStreamSocket::EventStreamSocket(StreamSocketImpl * impl) : m_impl(impl) {
EventStreamSocket::EventStreamSocket(Server * server) {
m_impl = std::unique_ptr<EventStreamSocketImpl>(new EventStreamSocketImpl(server));
m_impl->init();
}
EventStreamSocket::~EventStreamSocket() {
......@@ -36,22 +38,20 @@ EventStreamSocket::~EventStreamSocket() {
std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
std::unique_ptr<zmq::message_t> message(m_impl->receive(blocking));
std::string message(m_impl->receive(blocking));
// In case of non-blocking call, the message can be null.
if (message == nullptr) {
if (message == "") {
return std::unique_ptr<Event>(nullptr);
}
std::string response(static_cast<char*>(message->data()), message->size());
if (response == message::Event::STATUS) {
if (message == message::Event::STATUS) {
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
json::parse(event, message);
int id = event[message::StatusEvent::ID].GetInt();
std::string name = event[message::StatusEvent::NAME].GetString();
......@@ -63,30 +63,29 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
}
return std::make_unique<StatusEvent>(id, name, state, pastStates);
}
else if (response == message::Event::RESULT) {
else if (message == message::Event::RESULT) {
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
json::parse(event, message);
int id = event[message::ResultEvent::ID].GetInt();
std::string name = event[message::ResultEvent::NAME].GetString();
// Get the data in the next part.
message = m_impl->receive();
std::string data(message->data<char>(), message->size());
return std::make_unique<ResultEvent>(id, name, data);
return std::make_unique<ResultEvent>(id, name, message);
}
else if (response == message::Event::PUBLISHER) {
else if (message == message::Event::PUBLISHER) {
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
json::parse(event, message);
int id = event[message::PublisherEvent::ID].GetInt();
std::string name = event[message::PublisherEvent::NAME].GetString();
......@@ -94,13 +93,13 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return std::make_unique<PublisherEvent>(id, name, publisherName);
}
else if (response == message::Event::PORT) {
else if (message == message::Event::PORT) {
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
json::parse(event, message);
int id = event[message::PortEvent::ID].GetInt();
std::string name = event[message::PortEvent::NAME].GetString();
......@@ -108,13 +107,13 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return std::make_unique<PortEvent>(id, name, portName);
}
else if (response == message::Event::KEYVALUE) {
else if (message == message::Event::KEYVALUE) {
message = m_impl->receive();
// Get the JSON event.
json::Object event;
json::parse(event, message.get());
json::parse(event, message);
int id = event[message::KeyEvent::ID].GetInt();
std::string name = event[message::KeyEvent::NAME].GetString();
......@@ -129,7 +128,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return std::make_unique<KeyEvent>(id, name, KeyEvent::Status::REMOVED, key, value);
}
}
else if (response == message::Event::CANCEL) {
else if (message == message::Event::CANCEL) {
message = m_impl->receive();
......@@ -137,7 +136,7 @@ std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
return std::unique_ptr<Event>(nullptr);
}
std::cerr << "Cannot process '" << response << "' event" << std::endl;
std::cerr << "Cannot process '" << message << "' event" << std::endl;
return std::unique_ptr<Event>(nullptr);
}
......
......@@ -23,6 +23,7 @@
#include "EventThread.h"
#include "impl/CancelIdGenerator.h"
#include "impl/StreamSocketImpl.h"
#include "impl/EventStreamSocketImpl.h"
#include "impl/zmq/ContextZmq.h"
#include "JSON.h"
#include "Messages.h"
......@@ -512,20 +513,8 @@ std::unique_ptr<EventStreamSocket> Server::openEventStream() {
initStatus();
}
std::stringstream cancelEndpoint;
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// Create the sockets.
zmq::socket_t * cancelPublisher = m_contextImpl->createCancelPublisher(cancelEndpoint.str());
zmq::socket_t * subscriber = m_contextImpl->createEventSubscriber(getStatusEndpoint().toString(), cancelEndpoint.str());
// Wait for the connection to be ready.
m_contextImpl->waitForSubscriber(subscriber, m_requestSocket.get());
// Create the event stream socket.
return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(this));
}
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
......@@ -644,6 +633,10 @@ void Server::initRequestSocket() {
m_requestSocket = std::move(createRequestSocket(m_serverEndpoint.toString(), m_contextImpl->getTimeout()));
}
Context * Server::getContext() {
return m_contextImpl.get();
}
Endpoint Server::getStatusEndpoint() const {
return m_serverEndpoint.withPort(m_statusPort);
}
......@@ -700,6 +693,8 @@ std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::
// Create the output stream socket.
return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
return nullptr;
}
std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint) {
......@@ -710,6 +705,16 @@ std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& en
return std::unique_ptr<RequestSocket>(new RequestSocket(m_contextImpl.get(), endpoint, timeout));
}
void Server::sendSync() {
try {
m_requestSocket->requestJSON(createSyncRequest());
}
catch (const ConnectionTimeout& e) {
// do nothing
}
}
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {
os << "server@" << server.m_serverEndpoint.toString();
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "EventStreamSocketImpl.h"
#include "CancelIdGenerator.h"
#include "Server.h"
#include "zmq/ContextZmq.h"
#include "../Messages.h"
using namespace std;
namespace cameo {
EventStreamSocketImpl::EventStreamSocketImpl(Server * server) :
m_server(server) {
m_context = dynamic_cast<ContextZmq *>(server->getContext());
}
EventStreamSocketImpl::~EventStreamSocketImpl() {
close();
}
void EventStreamSocketImpl::init() {
std::stringstream cancelEndpoint;
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// Create the sockets.
m_cancelSocket.reset(m_context->createCancelPublisher(cancelEndpoint.str()));
m_socket.reset(m_context->createEventSubscriber(m_server->getStatusEndpoint().toString(), cancelEndpoint.str()));
// Wait for the connection to be ready.
//m_context->waitForSubscriber(subscriber, m_requestSocket.get());
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*(m_socket.get()));
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
while (true) {
//isAvailable(socket, 100);
m_server->sendSync();
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
break;
}
}
}
void EventStreamSocketImpl::send(const std::string& data) {
zmq::message_t messageData(data.size());
memcpy((void *) messageData.data(), data.c_str(), data.size());
m_socket->send(messageData);
}
std::string EventStreamSocketImpl::receive(bool blocking) {
// Use the message interface.
unique_ptr<zmq::message_t> message(new zmq::message_t());
if (m_socket->recv(message.get(), (blocking ? 0 : ZMQ_DONTWAIT))) {
// The message exists.
return std::string(message->data<char>(), message->size());
}
return "";
}
void EventStreamSocketImpl::cancel() {
if (m_cancelSocket.get() != nullptr) {
string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, ZMQ_SNDMORE);
m_cancelSocket->send(requestData);
}
}
void EventStreamSocketImpl::close() {
m_socket->close();
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_EVENTSTREAMSOCKETIMPL_H_
#define CAMEO_EVENTSTREAMSOCKETIMPL_H_
#include <string>
#include <memory>
#include "Strings.h"
#include <zmq.hpp>
namespace cameo {
class Server;
class ContextZmq;
class EventStreamSocketImpl {
public:
EventStreamSocketImpl(Server * server);
virtual ~EventStreamSocketImpl();
void init();
void send(const std::string& data);
std::string receive(bool blocking = true);
void cancel();
void close();
Server * m_server;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
};
}
#endif
......@@ -27,7 +27,7 @@ using namespace std;
namespace cameo {
RequestSocketZmq::RequestSocketZmq(Context * context, const std::string& endpoint, int timeout) :
m_services(dynamic_cast<ContextZmq *>(context)), m_endpoint(endpoint) {
m_context(dynamic_cast<ContextZmq *>(context)), m_endpoint(endpoint) {
init();
......@@ -60,7 +60,7 @@ void RequestSocketZmq::init() {
// Reset if the socket is null.
if (m_socket.get() == nullptr) {
m_socket.reset(m_services->createRequestSocket(m_endpoint));
m_socket.reset(m_context->createRequestSocket(m_endpoint));
// Apply the linger to the socket.
setSocketLinger();
......
......@@ -38,7 +38,7 @@ public:
virtual std::string request(const std::string& requestPart1, const std::string& requestPart2, int overrideTimeout);
virtual std::string request(const std::string& requestPart1, const std::string& requestPart2, const std::string& requestPart3, int overrideTimeout);
ContextZmq * m_services;
ContextZmq * m_context;
std::string m_endpoint;
std::unique_ptr<zmq::socket_t> m_socket;
int m_timeout;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment