SubscriberImpl.cpp 6.81 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "SubscriberImpl.h"
18
#include "Serializer.h"
19
20
21
#include "CancelIdGenerator.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
23
#include "Server.h"
#include "message/Message.h"
24
#include <sstream>
25
#include "JSON.h"
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

using namespace std;

namespace cameo {

SubscriberImpl::SubscriberImpl(Server * server, const std::string & url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
	m_server(server),
	m_url(url),
	m_publisherName(publisherName),
	m_publisherPort(publisherPort),
	m_synchronizerPort(synchronizerPort),
	m_numberOfSubscribers(numberOfSubscribers),
	m_instanceName(instanceName),
	m_instanceId(instanceId),
	m_instanceEndpoint(instanceEndpoint),
	m_statusEndpoint(statusEndpoint),
	m_ended(false),
	m_canceled(false) {
}

SubscriberImpl::~SubscriberImpl() {
}

void SubscriberImpl::init() {

	// Create a socket for publishing.
	m_subscriber.reset(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_SUB));
53
54
55
56
57
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, string(message::Event::SYNC).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, string(message::Event::STREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM, string(message::Event::ENDSTREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, string(message::Event::CANCEL).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, string(message::Event::STATUS).length());
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

	stringstream pubEndpoint;
	pubEndpoint << m_url << ":" << m_publisherPort;

	m_subscriber->connect(pubEndpoint.str().c_str());

	// We must first bind the cancel publisher before connecting the subscriber.
	stringstream cancelEndpoint;

	// We define a unique name.
	cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
	m_cancelEndpoint = cancelEndpoint.str();

	m_cancelPublisher = unique_ptr<zmq::socket_t>(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_PUB));
	m_cancelPublisher->bind(m_cancelEndpoint.c_str());

	m_subscriber->connect(m_cancelEndpoint.c_str());
	m_subscriber->connect(m_statusEndpoint.c_str());

	// Synchronize the subscriber only if the number of subscribers > 0.
	if (m_numberOfSubscribers > 0) {

		stringstream syncEndpoint;
		syncEndpoint << m_url << ":" << m_synchronizerPort;

		string endpoint = syncEndpoint.str();

		// Create a request socket.
		unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(endpoint);

		// Poll subscriber.
		zmq_pollitem_t items[1];
		items[0].socket = static_cast<void *>(*m_subscriber);
		items[0].fd = 0;
		items[0].events = ZMQ_POLLIN;
		items[0].revents = 0;

		while (true) {
			m_server->m_impl->isAvailable(requestSocket.get(), 100);

			// Wait for 100ms.
			int rc = zmq::poll(items, 1, 100);
			if (rc != 0) {
				break;
			}
		}

105
		requestSocket->request(m_server->m_impl->createSubscribePublisherRequest());
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
	}
}

bool SubscriberImpl::isEnded() const {
	return m_ended;
}

bool SubscriberImpl::isCanceled() const {
	return m_canceled;
}

bool SubscriberImpl::receiveBinary(std::string& data) {

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

125
		if (response == message::Event::STREAM) {
126
127
128
129
130
131
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data = string(static_cast<char*>(message->data()), message->size());

			return true;

132
		} else if (response == message::Event::ENDSTREAM) {
133
134
135
			m_ended = true;
			return false;

136
		} else if (response == message::Event::CANCEL) {
137
138
139
			m_canceled = true;
			return false;

140
		} else if (response == message::Event::STATUS) {
141
142
143
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

144
145
146
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());
147

148
149
150
151
			int id = status[message::StatusEvent::ID].GetInt();

			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
					return false;
				}
			}
		}
	}

	return false;
}

bool SubscriberImpl::receive(std::string& data) {

	string bytes;
	bool stream = receiveBinary(bytes);

	if (!stream) {
		return false;
	}

	parse(bytes, data);

	return true;
}

bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) {

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

190
		if (response == message::Event::STREAM) {
191
192
193
194
195
196
197
198
199
200
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data1 = string(static_cast<char*>(message->data()), message->size());

			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data2 = string(static_cast<char*>(message->data()), message->size());

			return true;

201
		} else if (response == message::Event::ENDSTREAM) {
202
203
204
			m_ended = true;
			return false;

205
		} else if (response == message::Event::CANCEL) {
206
207
			return false;

208
		} else if (response == message::Event::STATUS) {
209
210
211
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

212
213
214
215
216
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());

			int id = status[message::StatusEvent::ID].GetInt();
217

218
219
			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
					return false;
				}
			}
		}
	}

	return false;
}

WaitingImpl * SubscriberImpl::waiting() {

	// Waiting gets the cancel publisher.
239
	return new SocketWaitingImpl(m_cancelPublisher.get(), message::Event::CANCEL);
240
241
242
}

}