PublisherImpl.cpp 6.72 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "PublisherImpl.h"
18
19
#include "Application.h"
#include "Serializer.h"
20
21
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
#include "message/Message.h"
23
#include <sstream>
24
#include "JSON.h"
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

using namespace std;

namespace cameo {

PublisherImpl::PublisherImpl(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
	m_application(application),
	m_publisherPort(publisherPort),
	m_synchronizerPort(synchronizerPort),
	m_name(name),
	m_numberOfSubscribers(numberOfSubscribers),
	m_ended(false) {

	// create a socket for publishing
	m_publisher.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_PUB));
	stringstream pubEndpoint;
	pubEndpoint << "tcp://*:" << m_publisherPort;

	m_publisher->bind(pubEndpoint.str().c_str());
}

PublisherImpl::~PublisherImpl() {
	terminate();
}

const std::string& PublisherImpl::getName() const {
	return m_name;
}

const std::string& PublisherImpl::getApplicationName() const {
	return m_application->getName();
}

int PublisherImpl::getApplicationId() const {
	return m_application->getId();
}

const std::string& PublisherImpl::getApplicationEndpoint() const {
	return m_application->getEndpoint();
}

bool PublisherImpl::waitForSubscribers() {

	if (m_numberOfSubscribers <= 0) {
		return true;
	}

72
	// Create a socket to receive the messages from the subscribers.
73
74
75
76
77
78
79
80
	zmq::socket_t synchronizer(m_application->m_impl->m_context, ZMQ_REP);

	stringstream syncEndpoint;
	string url = "tcp://*";

	syncEndpoint << url << ":" << m_synchronizerPort;
	synchronizer.bind(syncEndpoint.str().c_str());

81
	// Loop until the number of subscribers is reached.
82
83
84
85
86
87
88
89
	int counter = 0;
	bool canceled = false;

	while (counter < m_numberOfSubscribers) {

		unique_ptr<zmq::message_t> message(new zmq::message_t);
		synchronizer.recv(message.get(), 0);

90
91
92
		// Get the JSON request.
		json::Object request;
		json::parse(request, message.get());
93

94
		int type = request[message::TYPE].GetInt();
95
96
97

		unique_ptr<zmq::message_t> reply;

98
		if (type == message::SYNC) {
99
			reply.reset(processInitCommand());
100
		}
101
		else if (type == message::SUBSCRIBE_PUBLISHER_v0) {
102
103
			counter++;
			reply.reset(processSubscribePublisherCommand());
104
105
		}
		else if (type == message::CANCEL) {
106
107
108
			canceled = true;
			counter = m_numberOfSubscribers;
			reply.reset(processCancelPublisherSyncCommand());
109
110
111
		}
		else {
			cerr << "Unknown message type " << type << endl;
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
			synchronizer.send(*message);
		}

		// send to the client
		if (reply != nullptr) {
			synchronizer.send(*reply);
		}
	}

	return !canceled;
}

void PublisherImpl::cancelWaitForSubscribers() {

	stringstream endpoint;
	endpoint << m_application->getUrl() << ":" << (m_publisherPort + 1);

129
130
131
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::CANCEL);
132
133
134

	// Create a request socket only for the request.
	unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
135
	requestSocket->request(request.toString());
136
137
138
139
140
141
142
143
144
145
}

WaitingImpl * PublisherImpl::waiting() {

	return new GenericWaitingImpl(bind(&PublisherImpl::cancelWaitForSubscribers, this));
}

void PublisherImpl::sendBinary(const std::string& data) {

	// send a STREAM message by the publisher socket
146
	publish(message::Event::STREAM, data.c_str(), data.length());
147
148
149
150
151
152
153
154
155
}

void PublisherImpl::send(const std::string& data) {

	// encode the data
	string result;
	serialize(data, result);

	// send a STREAM message by the publisher socket
156
	publish(message::Event::STREAM, result.c_str(), result.length());
157
158
159
160
161
}

void PublisherImpl::sendTwoBinaryParts(const std::string& data1, const std::string& data2) {

	// send a STREAM message by the publisher socket
162
	publishTwoParts(message::Event::STREAM, data1.c_str(), data1.length(), data2.c_str(), data2.length());
163
164
165
166
167
168
}

void PublisherImpl::setEnd() {

	if (!m_ended && m_publisher.get() != nullptr) {
		// send a dummy ENDSTREAM message by the publisher socket
169
170
		string data(message::Event::ENDSTREAM);
		publish(message::Event::ENDSTREAM, data.c_str(), data.length());
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

		m_ended = true;
	}
}

bool PublisherImpl::isEnded() {
	return m_ended;
}

void PublisherImpl::terminate() {

	if (m_publisher.get() != nullptr) {
		setEnd();
		m_publisher.reset(nullptr);

		bool success = m_application->destroyPublisher(m_name);
		if (!success) {
			cerr << "server cannot destroy publisher " << m_name << endl;
		}
	}
}

void PublisherImpl::publish(const std::string& header, const char* data, std::size_t size) {

	zmq::message_t requestType(header.length());
196
	memcpy(requestType.data(), header.c_str(), header.length());
197
198

	zmq::message_t requestData(size);
199
	memcpy(requestData.data(), data, size);
200
201
202
203
204
205
206
207

	m_publisher->send(requestType, ZMQ_SNDMORE);
	m_publisher->send(requestData);
}

void PublisherImpl::publishTwoParts(const std::string& header, const char* data1, std::size_t size1, const char* data2, std::size_t size2) {

	zmq::message_t requestType(header.length());
208
	memcpy(requestType.data(), header.c_str(), header.length());
209
210

	zmq::message_t requestData1(size1);
211
	memcpy(requestData1.data(), data1, size1);
212
213

	zmq::message_t requestData2(size2);
214
	memcpy(requestData2.data(), data2, size2);
215
216
217
218
219
220
221
222
223

	m_publisher->send(requestType, ZMQ_SNDMORE);
	m_publisher->send(requestData1, ZMQ_SNDMORE);
	m_publisher->send(requestData2);
}

zmq::message_t * PublisherImpl::processInitCommand() {

	// send a dummy SYNC message by the publisher socket
224
225
	string data(message::Event::SYNC);
	publish(message::Event::SYNC, data.c_str(), data.length());
226
227
228
229
230
231
232
233
234
235
236

	data = "Connection OK";
	size_t size = data.length();
	zmq::message_t * reply = new zmq::message_t(size);
	memcpy((void *) reply->data(), data.c_str(), size);

	return reply;
}

zmq::message_t * PublisherImpl::processSubscribePublisherCommand() {

237
	string result = m_application->m_impl->createRequestResponse(0, "OK");
238
239

	zmq::message_t * reply = new zmq::message_t(result.length());
240
	memcpy(reply->data(), result.c_str(), result.length());
241
242
243
244
245
246

	return reply;
}

zmq::message_t * PublisherImpl::processCancelPublisherSyncCommand() {

247
	string result = m_application->m_impl->createRequestResponse(0, "OK");
248
249

	zmq::message_t * reply = new zmq::message_t(result.length());
250
	memcpy(reply->data(), result.c_str(), result.length());
251
252
253
254
255
256

	return reply;
}


}