Commit 1537e587 authored by legoc's avatar legoc
Browse files

Reviewing OutputStreamSocket

parent d8f85568
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
namespace cameo { namespace cameo {
class Server; class Server;
class WaitingImpl;
class EventStreamSocketImpl; class EventStreamSocketImpl;
namespace application { namespace application {
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
namespace cameo { namespace cameo {
class StreamSocketImpl; class Server;
class WaitingImpl; class EventStreamSocketImpl;
namespace application { namespace application {
...@@ -66,13 +66,13 @@ public: ...@@ -66,13 +66,13 @@ public:
bool isCanceled() const; bool isCanceled() const;
private: private:
OutputStreamSocket(StreamSocketImpl * impl); OutputStreamSocket(Server * server, const std::string& name);
int m_applicationId; int m_applicationId;
bool m_ended; bool m_ended;
bool m_canceled; bool m_canceled;
std::unique_ptr<StreamSocketImpl> m_impl; std::unique_ptr<EventStreamSocketImpl> m_impl;
}; };
} }
......
...@@ -137,6 +137,9 @@ public: ...@@ -137,6 +137,9 @@ public:
Endpoint getStatusEndpoint() const; Endpoint getStatusEndpoint() const;
void sendSync(); void sendSync();
void sendSyncStream(const std::string& name);
int getStreamPort(const std::string& name);
private: private:
void initServer(const Endpoint& endpoint, int timeoutMs); void initServer(const Endpoint& endpoint, int timeoutMs);
...@@ -162,7 +165,6 @@ private: ...@@ -162,7 +165,6 @@ private:
void retrieveServerVersion(); void retrieveServerVersion();
void initStatus(); void initStatus();
int getStreamPort(const std::string& name);
std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name); std::unique_ptr<OutputStreamSocket> createOutputStreamSocket(const std::string& name);
std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint); std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint);
std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint, int timeout); std::unique_ptr<RequestSocket> createRequestSocket(const std::string& endpoint, int timeout);
......
...@@ -17,10 +17,8 @@ ...@@ -17,10 +17,8 @@
#include "OutputStreamSocket.h" #include "OutputStreamSocket.h"
#include "JSON.h" #include "JSON.h"
#include "impl/SocketWaitingImpl.h"
#include "impl/StreamSocketImpl.h"
#include "impl/zmq/ContextZmq.h"
#include "Messages.h" #include "Messages.h"
#include "impl/zmq/OutputStreamSocketZmq.h"
#include <iostream> #include <iostream>
namespace cameo { namespace cameo {
...@@ -41,11 +39,13 @@ bool Output::isEndOfLine() const { ...@@ -41,11 +39,13 @@ bool Output::isEndOfLine() const {
return m_endOfLine; return m_endOfLine;
} }
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) : OutputStreamSocket::OutputStreamSocket(Server * server, const std::string& name) :
m_applicationId(-1), m_applicationId(-1),
m_ended(false), m_ended(false),
m_canceled(false), m_canceled(false) {
m_impl(impl) {
//TODO Replace with factory.
m_impl = std::unique_ptr<EventStreamSocketImpl>(new OutputStreamSocketZmq(server, name));
} }
OutputStreamSocket::~OutputStreamSocket() { OutputStreamSocket::~OutputStreamSocket() {
...@@ -59,8 +59,7 @@ std::optional<Output> OutputStreamSocket::receive() { ...@@ -59,8 +59,7 @@ std::optional<Output> OutputStreamSocket::receive() {
// Loop on receive() because in case of configuration multiple=yes, messages can come from different instances. // Loop on receive() because in case of configuration multiple=yes, messages can come from different instances.
while (true) { while (true) {
std::unique_ptr<zmq::message_t> message(m_impl->receive()); std::string messageType(m_impl->receive());
std::string messageType(message->data<char>(), message->size());
// Cancel can only come from this instance. // Cancel can only come from this instance.
if (messageType == message::Event::CANCEL) { if (messageType == message::Event::CANCEL) {
...@@ -69,7 +68,7 @@ std::optional<Output> OutputStreamSocket::receive() { ...@@ -69,7 +68,7 @@ std::optional<Output> OutputStreamSocket::receive() {
} }
// Get the second part of the message. // Get the second part of the message.
message = m_impl->receive(); std::string message = m_impl->receive();
// Continue if type of message is SYNCSTREAM. Theses messages are only used for the poller. // Continue if type of message is SYNCSTREAM. Theses messages are only used for the poller.
if (messageType == message::Event::SYNCSTREAM) { if (messageType == message::Event::SYNCSTREAM) {
...@@ -78,7 +77,7 @@ std::optional<Output> OutputStreamSocket::receive() { ...@@ -78,7 +77,7 @@ std::optional<Output> OutputStreamSocket::receive() {
// Get the JSON event. // Get the JSON event.
json::Object event; json::Object event;
json::parse(event, message.get()); json::parse(event, message);
int id = event[message::ApplicationStream::ID].GetInt(); int id = event[message::ApplicationStream::ID].GetInt();
......
...@@ -674,27 +674,8 @@ int Server::getStreamPort(const std::string& name) { ...@@ -674,27 +674,8 @@ int Server::getStreamPort(const std::string& name) {
} }
std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::string& name) { std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::string& name) {
// Create the event stream socket.
int port = getStreamPort(name); return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(this, name));
if (port == -1) {
return nullptr;
}
// We define a unique name that depends on the event stream socket object because there can be many (instances).
std::string cancelEndpoint = "inproc://cancel." + std::to_string(CancelIdGenerator::newId());
// Create the sockets.
zmq::socket_t * cancelPublisher = m_contextImpl->createCancelPublisher(cancelEndpoint);
zmq::socket_t * subscriber = m_contextImpl->createOutputStreamSubscriber(m_serverEndpoint.withPort(port).toString(), cancelEndpoint);
// Wait for the connection to be ready.
m_contextImpl->waitForStreamSubscriber(subscriber, m_requestSocket.get(), name);
// Create the output stream socket.
return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
return nullptr;
} }
std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint) { std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint) {
...@@ -711,7 +692,17 @@ void Server::sendSync() { ...@@ -711,7 +692,17 @@ void Server::sendSync() {
m_requestSocket->requestJSON(createSyncRequest()); m_requestSocket->requestJSON(createSyncRequest());
} }
catch (const ConnectionTimeout& e) { catch (const ConnectionTimeout& e) {
// do nothing // The server is not accessible.
}
}
void Server::sendSyncStream(const std::string& name) {
try {
m_requestSocket->requestJSON(createSyncStreamRequest(name));
}
catch (const ConnectionTimeout&) {
// The server is not accessible.
} }
} }
......
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "OutputStreamSocketZmq.h"
#include "Server.h"
#include "ContextZmq.h"
#include "../../Messages.h"
#include "../../CancelIdGenerator.h"
using namespace std;
namespace cameo {
OutputStreamSocketZmq::OutputStreamSocketZmq(Server * server, const std::string& name) :
m_server(server),
m_name(name) {
m_context = dynamic_cast<ContextZmq *>(server->getContext());
}
OutputStreamSocketZmq::~OutputStreamSocketZmq() {
close();
}
void OutputStreamSocketZmq::init() {
int port = m_server->getStreamPort(m_name);
if (port == -1) {
return;
}
std::stringstream cancelEndpoint;
// We define a unique name that depends on the event stream socket object because there can be many (instances).
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
// Create the sockets.
m_cancelSocket.reset(m_context->createCancelPublisher(cancelEndpoint.str()));
m_socket.reset(m_context->createEventSubscriber(m_server->getEndpoint().withPort(port).toString(), cancelEndpoint.str()));
// Wait for the connection to be ready.
// Poll subscriber.
zmq_pollitem_t items[1];
items[0].socket = static_cast<void *>(*(m_socket.get()));
items[0].fd = 0;
items[0].events = ZMQ_POLLIN;
items[0].revents = 0;
while (true) {
m_server->sendSyncStream(m_name);
// Wait for 100ms.
int rc = zmq::poll(items, 1, 100);
if (rc != 0) {
break;
}
}
}
void OutputStreamSocketZmq::send(const std::string& data) {
zmq::message_t messageData(data.size());
memcpy((void *) messageData.data(), data.c_str(), data.size());
m_socket->send(messageData);
}
std::string OutputStreamSocketZmq::receive(bool blocking) {
// Use the message interface.
unique_ptr<zmq::message_t> message(new zmq::message_t());
if (m_socket->recv(message.get(), (blocking ? 0 : ZMQ_DONTWAIT))) {
// The message exists.
return std::string(message->data<char>(), message->size());
}
return "";
}
void OutputStreamSocketZmq::cancel() {
if (m_cancelSocket.get() != nullptr) {
string data(message::Event::CANCEL);
zmq::message_t requestType(data.length());
zmq::message_t requestData(data.length());
memcpy(requestType.data(), message::Event::CANCEL, data.length());
memcpy(requestData.data(), data.c_str(), data.length());
m_cancelSocket->send(requestType, ZMQ_SNDMORE);
m_cancelSocket->send(requestData);
}
}
void OutputStreamSocketZmq::close() {
m_socket->close();
}
}
/*
* Copyright 2015 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef CAMEO_OUTPUTSTREAMSOCKETZMQ_H_
#define CAMEO_OUTPUTSTREAMSOCKETZMQ_H_
#include "../EventStreamSocketImpl.h"
#include <string>
#include <memory>
#include <zmq.hpp>
namespace cameo {
class Server;
class ContextZmq;
class OutputStreamSocketZmq : public EventStreamSocketImpl {
public:
OutputStreamSocketZmq(Server * server, const std::string& name);
virtual ~OutputStreamSocketZmq();
void init();
void send(const std::string& data);
std::string receive(bool blocking);
void cancel();
void close();
private:
Server * m_server;
std::string m_name;
ContextZmq * m_context;
std::unique_ptr<zmq::socket_t> m_socket;
std::unique_ptr<zmq::socket_t> m_cancelSocket;
};
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment