Commit d51bb1e7 authored by legoc's avatar legoc
Browse files

Removed functions from ContextZmq

parent 976b347d
......@@ -48,46 +48,4 @@ zmq::context_t& ContextZmq::getContext() {
return *m_context.get();
}
zmq::socket_t * ContextZmq::createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(*m_context.get(), ZMQ_SUB);
vector<string> streamList;
streamList.push_back(message::Event::STATUS);
streamList.push_back(message::Event::RESULT);
streamList.push_back(message::Event::PUBLISHER);
streamList.push_back(message::Event::PORT);
streamList.push_back(message::Event::KEYVALUE);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
subscriber->connect(endpoint.c_str());
subscriber->connect(cancelEndpoint.c_str());
return subscriber;
}
zmq::socket_t * ContextZmq::createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint) {
zmq::socket_t * subscriber = new zmq::socket_t(*m_context.get(), ZMQ_SUB);
vector<string> streamList;
streamList.push_back(message::Event::SYNCSTREAM);
streamList.push_back(message::Event::STREAM);
streamList.push_back(message::Event::ENDSTREAM);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
subscriber->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
subscriber->connect(endpoint.c_str());
subscriber->connect(cancelEndpoint.c_str());
return subscriber;
}
}
......@@ -42,9 +42,6 @@ public:
zmq::context_t& getContext();
zmq::socket_t * createEventSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
zmq::socket_t * createOutputStreamSubscriber(const std::string& endpoint, const std::string& cancelEndpoint);
private:
std::unique_ptr<zmq::context_t> m_context;
int m_timeout;
......
......@@ -44,8 +44,23 @@ void EventStreamSocketZmq::init() {
m_cancelSocket = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(m_context->getContext(), ZMQ_PUB));
m_cancelSocket->bind(cancelEndpoint.str());
m_socket = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(m_context->getContext(), ZMQ_SUB));
vector<string> streamList;
streamList.push_back(message::Event::STATUS);
streamList.push_back(message::Event::RESULT);
streamList.push_back(message::Event::PUBLISHER);
streamList.push_back(message::Event::PORT);
streamList.push_back(message::Event::KEYVALUE);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
m_socket->connect(m_server->getStatusEndpoint().toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
m_socket.reset(m_context->createEventSubscriber(m_server->getStatusEndpoint().toString(), cancelEndpoint.str()));
// Wait for the connection to be ready.
// Poll subscriber.
......
......@@ -51,7 +51,20 @@ void OutputStreamSocketZmq::init() {
m_cancelSocket = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(m_context->getContext(), ZMQ_PUB));
m_cancelSocket->bind(cancelEndpoint.str());
m_socket.reset(m_context->createOutputStreamSubscriber(m_server->getEndpoint().withPort(port).toString(), cancelEndpoint.str()));
m_socket = std::unique_ptr<zmq::socket_t>(new zmq::socket_t(m_context->getContext(), ZMQ_SUB));
vector<string> streamList;
streamList.push_back(message::Event::SYNCSTREAM);
streamList.push_back(message::Event::STREAM);
streamList.push_back(message::Event::ENDSTREAM);
streamList.push_back(message::Event::CANCEL);
for (vector<string>::const_iterator s = streamList.begin(); s != streamList.end(); ++s) {
m_socket->setsockopt(ZMQ_SUBSCRIBE, s->c_str(), s->length());
}
m_socket->connect(m_server->getEndpoint().withPort(port).toString().c_str());
m_socket->connect(cancelEndpoint.str().c_str());
// Wait for the connection to be ready.
// Poll subscriber.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment