Commit 5470d3b1 authored by legoc's avatar legoc
Browse files

Renamed StreamSocketImpl

parent 1537e587
......@@ -23,7 +23,7 @@
namespace cameo {
class Server;
class EventStreamSocketImpl;
class StreamSocketImpl;
namespace application {
......@@ -45,7 +45,7 @@ public:
private:
EventStreamSocket(Server * server);
std::unique_ptr<EventStreamSocketImpl> m_impl;
std::unique_ptr<StreamSocketImpl> m_impl;
};
}
......
......@@ -24,7 +24,7 @@
namespace cameo {
class Server;
class EventStreamSocketImpl;
class StreamSocketImpl;
namespace application {
......@@ -72,7 +72,7 @@ private:
bool m_ended;
bool m_canceled;
std::unique_ptr<EventStreamSocketImpl> m_impl;
std::unique_ptr<StreamSocketImpl> m_impl;
};
}
......
......@@ -29,7 +29,7 @@ namespace cameo {
EventStreamSocket::EventStreamSocket(Server * server) {
//TODO Replace with a factory.
m_impl = std::unique_ptr<EventStreamSocketImpl>(new EventStreamSocketZmq(server));
m_impl = std::unique_ptr<StreamSocketImpl>(new EventStreamSocketZmq(server));
m_impl->init();
}
......
......@@ -45,7 +45,7 @@ OutputStreamSocket::OutputStreamSocket(Server * server, const std::string& name)
m_canceled(false) {
//TODO Replace with factory.
m_impl = std::unique_ptr<EventStreamSocketImpl>(new OutputStreamSocketZmq(server, name));
m_impl = std::unique_ptr<StreamSocketImpl>(new OutputStreamSocketZmq(server, name));
}
OutputStreamSocket::~OutputStreamSocket() {
......
......@@ -23,7 +23,7 @@
#include "UndefinedKeyException.h"
#include "EventThread.h"
#include "impl/StreamSocketImpl.h"
#include "impl/EventStreamSocketImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/zmq/ContextZmq.h"
#include "JSON.h"
#include "Messages.h"
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_EVENTSTREAMSOCKETIMPL_H_
#define CAMEO_EVENTSTREAMSOCKETIMPL_H_
#include <string>
namespace cameo {
class EventStreamSocketImpl {
public:
virtual ~EventStreamSocketImpl() {}
virtual void init() = 0;
virtual void send(const std::string& data) = 0;
virtual std::string receive(bool blocking = true) = 0;
virtual void cancel() = 0;
virtual void close() = 0;
};
}
#endif
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "StreamSocketImpl.h"
#include "../Messages.h"
using namespace std;
namespace cameo {
StreamSocketImpl::StreamSocketImpl(zmq::socket_t * socket, zmq::socket_t * cancelSocket) :
m_socket(socket), m_cancelSocket(cancelSocket) {
}
StreamSocketImpl::~StreamSocketImpl() {
close();
}
void StreamSocketImpl::send(const std::string& data) {
zmq::message_t messageData(data.size());
memcpy((void *) messageData.data(), data.c_str(), data.size());
m_socket->send(messageData);
}
std::unique_ptr<zmq::message_t> StreamSocketImpl::receive(bool blocking) {
// Use the message interface.
unique_ptr<zmq::message_t> message(new zmq::message_t());
if (m_socket->recv(message.get(), (blocking ? 0 : ZMQ_DONTWAIT))) {
// The message exists.
return message;
}
return unique_ptr<zmq::message_t>(nullptr);
}
void StreamSocketImpl::cancel() {
if (m_cancelSocket.get() != nullptr) {
string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, ZMQ_SNDMORE);
m_cancelSocket->send(requestData);
}
}
void StreamSocketImpl::close() {
m_socket->close();
}
}
......@@ -14,28 +14,23 @@
* limitations under the Licence.
*/
#ifndef CAMEO_STREAMSOCKETIMPL_H_
#define CAMEO_STREAMSOCKETIMPL_H_
#ifndef CAMEO_EVENTSTREAMSOCKETIMPL_H_
#define CAMEO_EVENTSTREAMSOCKETIMPL_H_
#include <string>
#include <memory>
#include "zmq.hpp"
namespace cameo {
class StreamSocketImpl {
public:
StreamSocketImpl(zmq::socket_t * socket, zmq::socket_t * cancelSocket = nullptr);
virtual ~StreamSocketImpl();
virtual ~StreamSocketImpl() {}
void send(const std::string& data);
std::unique_ptr<zmq::message_t> receive(bool blocking = true);
void cancel();
void close();
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
virtual void init() = 0;
virtual void send(const std::string& data) = 0;
virtual std::string receive(bool blocking = true) = 0;
virtual void cancel() = 0;
virtual void close() = 0;
};
}
......
......@@ -17,7 +17,7 @@
#ifndef CAMEO_EVENTSTREAMSOCKETZMQ_H_
#define CAMEO_EVENTSTREAMSOCKETZMQ_H_
#include "../EventStreamSocketImpl.h"
#include "../StreamSocketImpl.h"
#include <string>
#include <memory>
#include <zmq.hpp>
......@@ -27,7 +27,7 @@ namespace cameo {
class Server;
class ContextZmq;
class EventStreamSocketZmq : public EventStreamSocketImpl {
class EventStreamSocketZmq : public StreamSocketImpl {
public:
EventStreamSocketZmq(Server * server);
......
......@@ -17,7 +17,7 @@
#ifndef CAMEO_OUTPUTSTREAMSOCKETZMQ_H_
#define CAMEO_OUTPUTSTREAMSOCKETZMQ_H_
#include "../EventStreamSocketImpl.h"
#include "../StreamSocketImpl.h"
#include <string>
#include <memory>
#include <zmq.hpp>
......@@ -27,7 +27,7 @@ namespace cameo {
class Server;
class ContextZmq;
class OutputStreamSocketZmq : public EventStreamSocketImpl {
class OutputStreamSocketZmq : public StreamSocketImpl {
public:
OutputStreamSocketZmq(Server * server, const std::string& name);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment