SubscriberImpl.cpp 6.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "SubscriberImpl.h"
18
#include "Serializer.h"
19
20
21
#include "CancelIdGenerator.h"
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
23
#include "Server.h"
#include "message/Message.h"
24
#include <sstream>
25
#include "JSON.h"
26
27
28
29
30

using namespace std;

namespace cameo {

31
SubscriberImpl::SubscriberImpl(Server * server, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint) :
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
	m_server(server),
	m_publisherName(publisherName),
	m_publisherPort(publisherPort),
	m_synchronizerPort(synchronizerPort),
	m_numberOfSubscribers(numberOfSubscribers),
	m_instanceName(instanceName),
	m_instanceId(instanceId),
	m_instanceEndpoint(instanceEndpoint),
	m_statusEndpoint(statusEndpoint),
	m_ended(false),
	m_canceled(false) {
}

SubscriberImpl::~SubscriberImpl() {
}

void SubscriberImpl::init() {

	// Create a socket for publishing.
	m_subscriber.reset(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_SUB));
52
53
54
55
56
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::SYNC, string(message::Event::SYNC).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STREAM, string(message::Event::STREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::ENDSTREAM, string(message::Event::ENDSTREAM).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::CANCEL, string(message::Event::CANCEL).length());
	m_subscriber->setsockopt(ZMQ_SUBSCRIBE, message::Event::STATUS, string(message::Event::STATUS).length());
57

58
	m_subscriber->connect(m_server->getEndpoint().withPort(m_publisherPort).toString());
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

	// We must first bind the cancel publisher before connecting the subscriber.
	stringstream cancelEndpoint;

	// We define a unique name.
	cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();
	m_cancelEndpoint = cancelEndpoint.str();

	m_cancelPublisher = unique_ptr<zmq::socket_t>(new zmq::socket_t(m_server->m_impl->m_context, ZMQ_PUB));
	m_cancelPublisher->bind(m_cancelEndpoint.c_str());

	m_subscriber->connect(m_cancelEndpoint.c_str());
	m_subscriber->connect(m_statusEndpoint.c_str());

	// Synchronize the subscriber only if the number of subscribers > 0.
	if (m_numberOfSubscribers > 0) {

		// Create a request socket.
77
		unique_ptr<RequestSocketImpl> requestSocket = m_server->createRequestSocket(m_server->getEndpoint().withPort(m_synchronizerPort).toString());
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

		// Poll subscriber.
		zmq_pollitem_t items[1];
		items[0].socket = static_cast<void *>(*m_subscriber);
		items[0].fd = 0;
		items[0].events = ZMQ_POLLIN;
		items[0].revents = 0;

		while (true) {
			m_server->m_impl->isAvailable(requestSocket.get(), 100);

			// Wait for 100ms.
			int rc = zmq::poll(items, 1, 100);
			if (rc != 0) {
				break;
			}
		}

96
		requestSocket->request(m_server->m_impl->createSubscribePublisherRequest());
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
	}
}

bool SubscriberImpl::isEnded() const {
	return m_ended;
}

bool SubscriberImpl::isCanceled() const {
	return m_canceled;
}

bool SubscriberImpl::receiveBinary(std::string& data) {

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

116
		if (response == message::Event::STREAM) {
117
118
119
120
121
122
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data = string(static_cast<char*>(message->data()), message->size());

			return true;

123
		} else if (response == message::Event::ENDSTREAM) {
124
125
126
			m_ended = true;
			return false;

127
		} else if (response == message::Event::CANCEL) {
128
129
130
			m_canceled = true;
			return false;

131
		} else if (response == message::Event::STATUS) {
132
133
134
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

135
136
137
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());
138

139
140
141
142
			int id = status[message::StatusEvent::ID].GetInt();

			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
					return false;
				}
			}
		}
	}

	return false;
}

bool SubscriberImpl::receive(std::string& data) {

	string bytes;
	bool stream = receiveBinary(bytes);

	if (!stream) {
		return false;
	}

	parse(bytes, data);

	return true;
}

bool SubscriberImpl::receiveTwoBinaryParts(std::string& data1, std::string& data2) {

	while (true) {
		unique_ptr<zmq::message_t> message(new zmq::message_t());
		m_subscriber->recv(message.get());

		string response(static_cast<char*>(message->data()), message->size());

181
		if (response == message::Event::STREAM) {
182
183
184
185
186
187
188
189
190
191
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data1 = string(static_cast<char*>(message->data()), message->size());

			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());
			data2 = string(static_cast<char*>(message->data()), message->size());

			return true;

192
		} else if (response == message::Event::ENDSTREAM) {
193
194
195
			m_ended = true;
			return false;

196
		} else if (response == message::Event::CANCEL) {
197
198
			return false;

199
		} else if (response == message::Event::STATUS) {
200
201
202
			message.reset(new zmq::message_t());
			m_subscriber->recv(message.get());

203
204
205
206
207
			// Get the JSON object.
			json::Object status;
			json::parse(status, message.get());

			int id = status[message::StatusEvent::ID].GetInt();
208

209
210
			if (id == m_instanceId) {
				application::State state = status[message::StatusEvent::APPLICATION_STATE].GetInt();
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

				// test the terminal state
				if (state == application::SUCCESS
					|| state == application::STOPPED
					|| state == application::KILLED
					|| state == application::FAILURE) {
					// Exit because the remote application has terminated.
					return false;
				}
			}
		}
	}

	return false;
}

WaitingImpl * SubscriberImpl::waiting() {

	// Waiting gets the cancel publisher.
230
	return new SocketWaitingImpl(m_cancelPublisher.get(), message::Event::CANCEL);
231
232
233
}

}