SubscriberImpl.cpp 6.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "SubscriberImpl.h"
18
#include "Serializer.h"
19
20
21
#include "CancelIdGenerator.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
23
#include "Server.h"
#include "message/Message.h"
24
#include <sstream>
25
#include "JSON.h"
26
27
28
29
30

using namespace std;

namespace cameo {

31
SubscriberImpl::SubscriberImpl(Server * server, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
32
	m_server(server), // server associated with instance
33
34
35
36
37
38
	m_publisherName(publisherName),
	m_publisherPort(publisherPort),
	m_synchronizerPort(synchronizerPort),
	m_numberOfSubscribers(numberOfSubscribers),
	m_instanceName(instanceName),
	m_instanceId(instanceId),
39
40
	m_instanceEndpoint(instanceEndpoint), // endpoint of server
	m_statusEndpoint(statusEndpoint), // status endpoint of server
41
42
43
44
45
46
47
48
49
50
51
	m_ended(false),
	m_canceled(false) {
}

SubscriberImpl::~SubscriberImpl() {
}

void SubscriberImpl::init() {

	// Create a socket for publishing.
	m_subscriber.reset(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_SUB));
52
53
54
55
56
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, string(message::Event::SYNC).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, string(message::Event::STREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM, string(message::Event::ENDSTREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, string(message::Event::CANCEL).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, string(message::Event::STATUS).length());
57

58
	m_subscriber->connect(m_server->getEndpoint().withPort(m_publisherPort).toString());
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

	// We must first bind the cancel publisher before connecting the subscriber.
	stringstream cancelEndpoint;

	// We define a unique name.
	cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
	m_cancelEndpoint = cancelEndpoint.str();

	m_cancelPublisher = unique_ptr<zmq::socket_t>(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_PUB));
	m_cancelPublisher->bind(m_cancelEndpoint.c_str());

	m_subscriber->connect(m_cancelEndpoint.c_str());
	m_subscriber->connect(m_statusEndpoint.c_str());

	// Synchronize the subscriber only if the number of subscribers > 0.
	if (m_numberOfSubscribers > 0) {

		// Create a request socket.
77
		unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(m_server->getEndpoint().withPort(m_synchronizerPort).toString());
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

		// Poll subscriber.
		zmq_pollitem_t items[1];
		items[0].socket = static_cast<void *>(*m_subscriber);
		items[0].fd = 0;
		items[0].events = ZMQ_POLLIN;
		items[0].revents = 0;

		while (true) {
			m_server->m_impl->isAvailable(requestSocket.get(), 100);

			// Wait for 100ms.
			int rc = zmq::poll(items, 1, 100);
			if (rc != 0) {
				break;
			}
		}

96
		requestSocket->request(m_server->m_impl->createSubscribePublisherRequest());
97
98
99
100
101
102
103
104
105
106
107
	}
}

bool SubscriberImpl::isEnded() const {
	return m_ended;
}

bool SubscriberImpl::isCanceled() const {
	return m_canceled;
}

108
std::optional<std::string> SubscriberImpl::receiveBinary() {
109
110
111
112
113
114
115

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

116
		if (response == message::Event::STREAM) {
117
118
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
119
			return string(static_cast<char*>(message->data()), message->size());
120

121
		} else if (response == message::Event::ENDSTREAM) {
122
			m_ended = true;
123
			return {};
124

125
		} else if (response == message::Event::CANCEL) {
126
			m_canceled = true;
127
			return {};
128

129
		} else if (response == message::Event::STATUS) {
130
131
132
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

133
134
135
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());
136

137
138
139
140
			int id = status[message::StatusEvent::ID].GetInt();

			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
141
142
143
144
145
146
147

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
148
					return {};
149
150
151
152
153
				}
			}
		}
	}

154
	return {};
155
156
}

157
158
std::optional<std::string> SubscriberImpl::receive() {
	return receiveBinary();
159
160
}

161
std::optional<std::tuple<std::string, std::string>> SubscriberImpl::receiveTwoBinaryParts() {
162
163
164
165
166
167
168

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

169
		if (response == message::Event::STREAM) {
170
171
172

			std::tuple<std::string, std::string> result;

173
174
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
175
			string data1 = string(static_cast<char*>(message->data()), message->size());
176
177
178

			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
179
			string data2 = string(static_cast<char*>(message->data()), message->size());
180

181
			return make_tuple(data1, data2);
182

183
		} else if (response == message::Event::ENDSTREAM) {
184
			m_ended = true;
185
			return {};
186

187
		} else if (response == message::Event::CANCEL) {
188
			return {};
189

190
		} else if (response == message::Event::STATUS) {
191
192
193
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

194
195
196
197
198
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());

			int id = status[message::StatusEvent::ID].GetInt();
199

200
201
			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
202
203
204
205
206
207
208

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
209
					return {};
210
211
212
213
214
				}
			}
		}
	}

215
	return {};
216
217
218
219
220
}

WaitingImpl * SubscriberImpl::waiting() {

	// Waiting gets the cancel publisher.
221
	return new SocketWaitingImpl(m_cancelPublisher.get(), message::Event::CANCEL);
222
223
224
}

}