PublisherImpl.cpp 6.72 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "PublisherImpl.h"
18
19
#include "Application.h"
#include "Serializer.h"
20
21
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
23
#include "message/JSON.h"
#include "message/Message.h"
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <sstream>

using namespace std;

namespace cameo {

PublisherImpl::PublisherImpl(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers) :
	m_application(application),
	m_publisherPort(publisherPort),
	m_synchronizerPort(synchronizerPort),
	m_name(name),
	m_numberOfSubscribers(numberOfSubscribers),
	m_ended(false) {

	// create a socket for publishing
	m_publisher.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_PUB));
	stringstream pubEndpoint;
	pubEndpoint << "tcp://*:" << m_publisherPort;

	m_publisher->bind(pubEndpoint.str().c_str());
}

PublisherImpl::~PublisherImpl() {
	terminate();
}

const std::string& PublisherImpl::getName() const {
	return m_name;
}

const std::string& PublisherImpl::getApplicationName() const {
	return m_application->getName();
}

int PublisherImpl::getApplicationId() const {
	return m_application->getId();
}

const std::string& PublisherImpl::getApplicationEndpoint() const {
	return m_application->getEndpoint();
}

bool PublisherImpl::waitForSubscribers() {

	if (m_numberOfSubscribers <= 0) {
		return true;
	}

72
	// Create a socket to receive the messages from the subscribers.
73
74
75
76
77
78
79
80
	zmq::socket_t synchronizer(m_application->m_impl->m_context, ZMQ_REP);

	stringstream syncEndpoint;
	string url = "tcp://*";

	syncEndpoint << url << ":" << m_synchronizerPort;
	synchronizer.bind(syncEndpoint.str().c_str());

81
	// Loop until the number of subscribers is reached.
82
83
84
85
86
87
88
89
	int counter = 0;
	bool canceled = false;

	while (counter < m_numberOfSubscribers) {

		unique_ptr<zmq::message_t> message(new zmq::message_t);
		synchronizer.recv(message.get(), 0);

90
91
92
		// Get the JSON request.
		json::Object request;
		json::parse(request, message.get());
93

94
		int type = request[message::TYPE].GetInt();
95
96
97

		unique_ptr<zmq::message_t> reply;

98
		if (type == message::SYNC) {
99
			reply.reset(processInitCommand());
100
101
		}
		else if (type == message::SUBSCRIBE_PUBLISHER) {
102
103
			counter++;
			reply.reset(processSubscribePublisherCommand());
104
105
		}
		else if (type == message::CANCEL) {
106
107
108
			canceled = true;
			counter = m_numberOfSubscribers;
			reply.reset(processCancelPublisherSyncCommand());
109
110
111
		}
		else {
			cerr << "Unknown message type " << type << endl;
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
			synchronizer.send(*message);
		}

		// send to the client
		if (reply != nullptr) {
			synchronizer.send(*reply);
		}
	}

	return !canceled;
}

void PublisherImpl::cancelWaitForSubscribers() {

	stringstream endpoint;
	endpoint << m_application->getUrl() << ":" << (m_publisherPort + 1);

129
130
131
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::CANCEL);
132
133
134

	// Create a request socket only for the request.
	unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(endpoint.str());
135
	requestSocket->request(request.toString());
136
137
138
139
140
141
142
143
144
145
}

WaitingImpl * PublisherImpl::waiting() {

	return new GenericWaitingImpl(bind(&PublisherImpl::cancelWaitForSubscribers, this));
}

void PublisherImpl::sendBinary(const std::string& data) {

	// send a STREAM message by the publisher socket
146
	publish(message::Event::STREAM, data.c_str(), data.length());
147
148
149
150
151
152
153
154
155
}

void PublisherImpl::send(const std::string& data) {

	// encode the data
	string result;
	serialize(data, result);

	// send a STREAM message by the publisher socket
156
	publish(message::Event::STREAM, result.c_str(), result.length());
157
158
159
160
161
}

void PublisherImpl::sendTwoBinaryParts(const std::string& data1, const std::string& data2) {

	// send a STREAM message by the publisher socket
162
	publishTwoParts(message::Event::STREAM, data1.c_str(), data1.length(), data2.c_str(), data2.length());
163
164
165
166
167
168
}

void PublisherImpl::setEnd() {

	if (!m_ended && m_publisher.get() != nullptr) {
		// send a dummy ENDSTREAM message by the publisher socket
169
170
		string data(message::Event::ENDSTREAM);
		publish(message::Event::ENDSTREAM, data.c_str(), data.length());
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

		m_ended = true;
	}
}

bool PublisherImpl::isEnded() {
	return m_ended;
}

void PublisherImpl::terminate() {

	if (m_publisher.get() != nullptr) {
		setEnd();
		m_publisher.reset(nullptr);

		bool success = m_application->destroyPublisher(m_name);
		if (!success) {
			cerr << "server cannot destroy publisher " << m_name << endl;
		}
	}
}

void PublisherImpl::publish(const std::string& header, const char* data, std::size_t size) {

	zmq::message_t requestType(header.length());
196
	memcpy(requestType.data(), header.c_str(), header.length());
197
198

	zmq::message_t requestData(size);
199
	memcpy(requestData.data(), data, size);
200
201
202
203
204
205
206
207

	m_publisher->send(requestType, ZMQ_SNDMORE);
	m_publisher->send(requestData);
}

void PublisherImpl::publishTwoParts(const std::string& header, const char* data1, std::size_t size1, const char* data2, std::size_t size2) {

	zmq::message_t requestType(header.length());
208
	memcpy(requestType.data(), header.c_str(), header.length());
209
210

	zmq::message_t requestData1(size1);
211
	memcpy(requestData1.data(), data1, size1);
212
213

	zmq::message_t requestData2(size2);
214
	memcpy(requestData2.data(), data2, size2);
215
216
217
218
219
220
221
222
223

	m_publisher->send(requestType, ZMQ_SNDMORE);
	m_publisher->send(requestData1, ZMQ_SNDMORE);
	m_publisher->send(requestData2);
}

zmq::message_t * PublisherImpl::processInitCommand() {

	// send a dummy SYNC message by the publisher socket
224
225
	string data(message::Event::SYNC);
	publish(message::Event::SYNC, data.c_str(), data.length());
226
227
228
229
230
231
232
233
234
235
236

	data = "Connection OK";
	size_t size = data.length();
	zmq::message_t * reply = new zmq::message_t(size);
	memcpy((void *) reply->data(), data.c_str(), size);

	return reply;
}

zmq::message_t * PublisherImpl::processSubscribePublisherCommand() {

237
	string result = m_application->m_impl->createRequestResponse(0, "OK");
238
239

	zmq::message_t * reply = new zmq::message_t(result.length());
240
	memcpy(reply->data(), result.c_str(), result.length());
241
242
243
244
245
246

	return reply;
}

zmq::message_t * PublisherImpl::processCancelPublisherSyncCommand() {

247
	string result = m_application->m_impl->createRequestResponse(0, "OK");
248
249

	zmq::message_t * reply = new zmq::message_t(result.length());
250
	memcpy(reply->data(), result.c_str(), result.length());
251
252
253
254
255
256

	return reply;
}


}