Commit 3e8d423a authored by legoc's avatar legoc
Browse files

Removed include to zmq.hpp in ContextZmq.h

parent 924d8cfe
......@@ -21,6 +21,7 @@
#include "JSON.h"
#include "../../Messages.h"
#include "../../RequestSocket.h"
#include <zmq.hpp>
#include <iostream>
#include <sstream>
......@@ -29,7 +30,7 @@ using namespace std;
namespace cameo {
ContextZmq::ContextZmq() : Context(),
m_context(1), m_timeout(0) {
m_context(new zmq::context_t(1)), m_timeout(0) {
}
ContextZmq::~ContextZmq() {
......@@ -43,9 +44,13 @@ int ContextZmq::getTimeout() const {
return m_timeout;
}
zmq::context_t& ContextZmq::getContext() {
return *m_context.get();
}
zmq::socket_t * ContextZmq::createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
zmq::socket_t * subscriber = new zmq::socket_t(*m_context.get(), ZMQ_SUB);
vector<string> streamList;
streamList.push_back(message::Event::STATUS);
......@@ -67,7 +72,7 @@ zmq::socket_t * ContextZmq::createEventSubscriber(const std::string& endpoint, c
zmq::socket_t * ContextZmq::createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(m_context, ZMQ_SUB);
zmq::socket_t * subscriber = new zmq::socket_t(*m_context.get(), ZMQ_SUB);
vector<string> streamList;
streamList.push_back(message::Event::SYNCSTREAM);
......@@ -87,7 +92,7 @@ zmq::socket_t * ContextZmq::createOutputStreamSubscriber(const std::string& endp
zmq::socket_t * ContextZmq::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * publisher = new zmq::socket_t(m_context, ZMQ_PUB);
zmq::socket_t * publisher = new zmq::socket_t(*m_context.get(), ZMQ_PUB);
publisher->bind(endpoint.c_str());
return publisher;
......@@ -95,7 +100,7 @@ zmq::socket_t * ContextZmq::createCancelPublisher(const std::string& endpoint) {
zmq::socket_t * ContextZmq::createRequestSocket(const std::string& endpoint) {
zmq::socket_t* socket = new zmq::socket_t(m_context, ZMQ_REQ);
zmq::socket_t* socket = new zmq::socket_t(*m_context.get(), ZMQ_REQ);
try {
// Set the linger value to 0 to ensure that pending requests are destroyed in case of timeout.
......
......@@ -20,7 +20,12 @@
#include "Context.h"
#include <vector>
#include <memory>
#include <zmq.hpp>
namespace zmq {
class socket_t;
class context_t;
}
namespace cameo {
......@@ -35,6 +40,8 @@ public:
void setTimeout(int timeout);
int getTimeout() const;
zmq::context_t& getContext();
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createCancelPublisher(const std::string& endpoint);
......@@ -45,7 +52,8 @@ public:
void waitForStreamSubscriber(zmq::socket_t * subscriber, RequestSocket * socket, const std::string& name);
void waitForSubscriber(zmq::socket_t * subscriber, RequestSocket * socket);
zmq::context_t m_context;
private:
std::unique_ptr<zmq::context_t> m_context;
int m_timeout;
};
......
......@@ -22,6 +22,7 @@
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <zmq.hpp>
#include <sstream>
namespace cameo {
......@@ -35,7 +36,7 @@ PublisherImpl::PublisherImpl(int publisherPort, int synchronizerPort, const std:
// create a socket for publishing
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_publisher.reset(new zmq::socket_t(contextImpl->m_context, ZMQ_PUB));
m_publisher.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_PUB));
std::stringstream pubEndpoint;
pubEndpoint << "tcp://*:" << publisherPort;
......@@ -70,7 +71,7 @@ bool PublisherImpl::waitForSubscribers() {
// Create a socket to receive the messages from the subscribers.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
zmq::socket_t synchronizer(contextImpl->m_context, ZMQ_REP);
zmq::socket_t synchronizer(contextImpl->getContext(), ZMQ_REP);
std::stringstream syncEndpoint;
std::string url = "tcp://*";
......
......@@ -22,6 +22,7 @@
#include "../../base/impl/zmq/ContextZmq.h"
#include "../../base/Messages.h"
#include "../../base/RequestSocket.h"
#include <zmq.hpp>
#include <sstream>
namespace cameo {
......@@ -43,7 +44,7 @@ RequesterImpl::RequesterImpl(const Endpoint& endpoint, int requesterPort, int re
// Create a socket REP.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_repSocket.reset(new zmq::socket_t(contextImpl->m_context, ZMQ_REP));
m_repSocket.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream reqEndpoint;
reqEndpoint << "tcp://*:" << m_requesterPort;
......
......@@ -37,7 +37,7 @@ ResponderImpl::ResponderImpl(int responderPort, const std::string& name) :
// create a socket REP
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_responder.reset(new zmq::socket_t(contextImpl->m_context, ZMQ_REP));
m_responder.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_REP));
std::stringstream repEndpoint;
repEndpoint << "tcp://*:" << m_responderPort;
......
......@@ -48,7 +48,7 @@ void SubscriberImpl::init() {
// Create a socket for publishing.
ContextZmq* contextImpl = dynamic_cast<ContextZmq *>(application::This::getCom().getContext());
m_subscriber.reset(new zmq::socket_t(contextImpl->m_context, ZMQ_SUB));
m_subscriber.reset(new zmq::socket_t(contextImpl->getContext(), ZMQ_SUB));
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, std::string(message::Event::SYNC).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, std::string(message::Event::STREAM).length());
m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM, std::string(message::Event::ENDSTREAM).length());
......@@ -64,7 +64,7 @@ void SubscriberImpl::init() {
cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
m_cancelEndpoint = cancelEndpoint.str();
m_cancelPublisher = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(contextImpl->m_context, ZMQ_PUB));
m_cancelPublisher = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(contextImpl->getContext(), ZMQ_PUB));
m_cancelPublisher->bind(m_cancelEndpoint.c_str());
m_subscriber->connect(m_cancelEndpoint.c_str());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment