Commit c160b0a0 authored by legoc's avatar legoc
Browse files

Reviewed EventStreamSocket impl

parent 3ada2df5
......@@ -46,8 +46,6 @@ public:
private:
EventStreamSocket(Server * server);
WaitingImpl * waiting();
std::unique_ptr<EventStreamSocketImpl> m_impl;
};
......
......@@ -21,15 +21,15 @@
#include "PublisherEvent.h"
#include "ResultEvent.h"
#include "StatusEvent.h"
#include "impl/SocketWaitingImpl.h"
#include "impl/EventStreamSocketImpl.h"
#include "JSON.h"
#include "Messages.h"
#include "impl/zmq/EventStreamSocketZmq.h"
namespace cameo {
EventStreamSocket::EventStreamSocket(Server * server) {
m_impl = std::unique_ptr<EventStreamSocketImpl>(new EventStreamSocketImpl(server));
//TODO Replace with a factory.
m_impl = std::unique_ptr<EventStreamSocketImpl>(new EventStreamSocketZmq(server));
m_impl->init();
}
......@@ -144,9 +144,4 @@ void EventStreamSocket::cancel() {
m_impl->cancel();
}
WaitingImpl * EventStreamSocket::waiting() {
// We transfer the ownership of cancel socket to WaitingImpl
return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
}
}
......@@ -17,11 +17,11 @@
#include "Server.h"
#include "Application.h"
#include "CancelIdGenerator.h"
#include "ConnectionChecker.h"
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
#include "EventThread.h"
#include "impl/CancelIdGenerator.h"
#include "impl/StreamSocketImpl.h"
#include "impl/EventStreamSocketImpl.h"
#include "impl/zmq/ContextZmq.h"
......
......@@ -18,31 +18,19 @@
#define CAMEO_EVENTSTREAMSOCKETIMPL_H_
#include <string>
#include <memory>
#include "Strings.h"
#include <zmq.hpp>
namespace cameo {
class Server;
class ContextZmq;
class EventStreamSocketImpl {
public:
EventStreamSocketImpl(Server * server);
virtual ~EventStreamSocketImpl();
void init();
void send(const std::string& data);
std::string receive(bool blocking = true);
void cancel();
void close();
Server * m_server;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
virtual ~EventStreamSocketImpl() {}
virtual void init() = 0;
virtual void send(const std::string& data) = 0;
virtual std::string receive(bool blocking = true) = 0;
virtual void cancel() = 0;
virtual void close() = 0;
};
}
......
......@@ -14,27 +14,26 @@
* limitations under the Licence.
*/
#include "EventStreamSocketImpl.h"
#include "CancelIdGenerator.h"
#include "EventStreamSocketZmq.h"
#include "Server.h"
#include "zmq/ContextZmq.h"
#include "../Messages.h"
#include "ContextZmq.h"
#include "../../Messages.h"
#include "../../CancelIdGenerator.h"
using namespace std;
namespace cameo {
EventStreamSocketImpl::EventStreamSocketImpl(Server * server) :
EventStreamSocketZmq::EventStreamSocketZmq(Server * server) :
m_server(server) {
m_context = dynamic_cast<ContextZmq *>(server->getContext());
}
EventStreamSocketImpl::~EventStreamSocketImpl() {
EventStreamSocketZmq::~EventStreamSocketZmq() {
close();
}
void EventStreamSocketImpl::init() {
void EventStreamSocketZmq::init() {
std::stringstream cancelEndpoint;
......@@ -46,8 +45,6 @@ void EventStreamSocketImpl::init() {
m_socket.reset(m_context->createEventSubscriber(m_server->getStatusEndpoint().toString(), cancelEndpoint.str()));
// Wait for the connection to be ready.
//m_context->waitForSubscriber(subscriber, m_requestSocket.get());
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*(m_socket.get()));
......@@ -56,7 +53,6 @@ void EventStreamSocketImpl::init() {
items[0].revents = 0;
while (true) {
//isAvailable(socket, 100);
m_server->sendSync();
// Wait for 100ms.
......@@ -67,14 +63,14 @@ void EventStreamSocketImpl::init() {
}
}
void EventStreamSocketImpl::send(const std::string& data) {
void EventStreamSocketZmq::send(const std::string& data) {
zmq::message_t messageData(data.size());
memcpy((void *) messageData.data(), data.c_str(), data.size());
m_socket->send(messageData);
}
std::string EventStreamSocketImpl::receive(bool blocking) {
std::string EventStreamSocketZmq::receive(bool blocking) {
// Use the message interface.
unique_ptr<zmq::message_t> message(new zmq::message_t());
......@@ -86,7 +82,7 @@ std::string EventStreamSocketImpl::receive(bool blocking) {
return "";
}
void EventStreamSocketImpl::cancel() {
void EventStreamSocketZmq::cancel() {
if (m_cancelSocket.get() != nullptr) {
string data(message::Event::CANCEL);
......@@ -99,7 +95,7 @@ void EventStreamSocketImpl::cancel() {
}
}
void EventStreamSocketImpl::close() {
void EventStreamSocketZmq::close() {
m_socket->close();
}
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_EVENTSTREAMSOCKETZMQ_H_
#define CAMEO_EVENTSTREAMSOCKETZMQ_H_
#include "../EventStreamSocketImpl.h"
#include <string>
#include <memory>
#include <zmq.hpp>
namespace cameo {
class Server;
class ContextZmq;
class EventStreamSocketZmq : public EventStreamSocketImpl {
public:
EventStreamSocketZmq(Server * server);
virtual ~EventStreamSocketZmq();
void init();
void send(const std::string& data);
std::string receive(bool blocking);
void cancel();
void close();
private:
Server * m_server;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
};
}
#endif
......@@ -16,10 +16,10 @@
#include "SubscriberImpl.h"
#include "../../base/CancelIdGenerator.h"
#include "Serializer.h"
#include "Server.h"
#include "JSON.h"
#include "../../base/impl/CancelIdGenerator.h"
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment