Server.cpp 11.9 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "Server.h"
legoc's avatar
legoc committed
18
19
20

#include <iostream>
#include <sstream>
21
#include "Application.h"
22
#include "ConnectionChecker.h"
23
24
#include "impl/ServicesImpl.h"
#include "ProtoType.h"
25
#include "EventThread.h"
26
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
27
#include "impl/RequestSocketImpl.h"
legoc's avatar
legoc committed
28
29
30
31
32

using namespace std;

namespace cameo {

legoc's avatar
legoc committed
33
Server::Server(const std::string& endpoint, int timeoutMs) :
34
	Services() {
legoc's avatar
legoc committed
35

36
	Services::init();
legoc's avatar
legoc committed
37
38
39
40

	vector<string> tokens = split(endpoint);

	if (tokens.size() < 3) {
41
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
legoc's avatar
legoc committed
42
43
44
45
46
47
48
	}

	m_url = tokens[0] + ":" + tokens[1];
	string port = tokens[2];
	istringstream is(port);
	is >> m_port;
	m_serverEndpoint = m_url + ":" + port;
49

legoc's avatar
legoc committed
50
51
52
	// Set the timeout.
	Services::setTimeout(timeoutMs);

53
54
55
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

legoc's avatar
legoc committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
	// Manage the ConnectionTimeout exception that can occur.
	try {
		// Start the event thread.
		unique_ptr<EventStreamSocket> socket = openEventStream();
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (const std::exception& e) {
		cout << "event error " << e.what() << endl;
	}
	catch (...) {
		// ...
	}

legoc's avatar
legoc committed
70
71
72
}

Server::~Server() {
73
74
75
76
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
77
78
}

legoc's avatar
legoc committed
79
void Server::setTimeout(int timeoutMs) {
80
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

const std::string& Server::getEndpoint() const {
	return Services::getEndpoint();
}

const std::string& Server::getUrl() const {
	return Services::getUrl();
}

int Server::getPort() const {
	return Services::getPort();
}

bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
103
bool Server::isAvailable() const {
104
105
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
106

107
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
108
109
	int timeout = getTimeout();
	if (timeout > 0) {
110
111
112
113
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
114
115
116
	}
}

legoc's avatar
legoc committed
117
std::unique_ptr<application::Instance> Server::makeInstance() {
118
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
119
120
}

legoc's avatar
legoc committed
121
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
legoc's avatar
legoc committed
122
123
124
	return start(name, vector<string>(), options);
}

legoc's avatar
legoc committed
125
126
int Server::getStreamPort(const std::string& name) {

legoc's avatar
legoc committed
127
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_OUTPUT), m_impl->createOutputRequest(name));
legoc's avatar
legoc committed
128
129
130
131
132
133
134

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	return requestResponse.value();
}

legoc's avatar
legoc committed
135
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
legoc's avatar
legoc committed
136

legoc's avatar
legoc committed
137
138
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
139
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
140

141
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
142
	instance->setName(name);
143
	registerEventListener(instance.get());
legoc's avatar
legoc committed
144
145

	try {
legoc's avatar
legoc committed
146
		if (outputStream) {
legoc's avatar
legoc committed
147
148
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
149
150
151
152
			unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
			instance->setOutputStreamSocket(socket);
		}

legoc's avatar
legoc committed
153
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_START), m_impl->createStartRequest(name, args, application::This::getReference()));
legoc's avatar
legoc committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172

		proto::RequestResponse requestResponse;
		requestResponse.ParseFromArray((*reply).data(), (*reply).size());

		if (requestResponse.value() == -1) {
			instance->setErrorMessage(requestResponse.message());
		} else {
			instance->setId(requestResponse.value());
		}

	} catch (const ConnectionTimeout& e) {
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

legoc's avatar
legoc committed
173
174
	string requestTypePart;
	string requestDataPart;
legoc's avatar
legoc committed
175
176

	if (immediately) {
legoc's avatar
legoc committed
177
178
		requestTypePart = m_impl->createRequestType(PROTO_KILL);
		requestDataPart = m_impl->createKillRequest(id);
legoc's avatar
legoc committed
179
	} else {
legoc's avatar
legoc committed
180
181
		requestTypePart = m_impl->createRequestType(PROTO_STOP);
		requestDataPart = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
182
183
	}

legoc's avatar
legoc committed
184
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
legoc's avatar
legoc committed
185
186
187
188
189
190
191

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	return Response(requestResponse.value(), requestResponse.message());
}

legoc's avatar
legoc committed
192
193
194
application::InstanceArray Server::connectAll(const std::string& name, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
195
196
197

	application::InstanceArray instances;

legoc's avatar
legoc committed
198
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECT), m_impl->createConnectRequest(name));
legoc's avatar
legoc committed
199
200
201
202
203
204
205
206
207
208
209
210

	proto::ApplicationInfoListResponse response;
	response.ParseFromArray((*reply).data(), (*reply).size());

	// allocate the array
	instances.allocate(response.applicationinfo_size());

	int aliveInstancesCount = 0;

	for (int i = 0; i < response.applicationinfo_size(); ++i) {
		proto::ApplicationInfo info = response.applicationinfo(i);

legoc's avatar
legoc committed
211
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
212

213
		// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
214
		instance->setName(info.name());
215
216
		registerEventListener(instance.get());

legoc's avatar
legoc committed
217
218
219
220
221
222
		int applicationId = info.id();

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

legoc's avatar
legoc committed
223
224
225
226
227
228
229
			// we connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
			if (outputStream) {
				unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(info.name()));
				instance->setOutputStreamSocket(socket);
			}

legoc's avatar
legoc committed
230
231
232
233
			instance->setId(applicationId);
			instance->setInitialState(info.applicationstate());
			instance->setPastStates(info.pastapplicationstates());

legoc's avatar
legoc committed
234
			instances.m_array[i] = std::move(instance);
legoc's avatar
legoc committed
235
236
237
238
239
240
241
242
243
244
245
		}
	}

	// Copy the instances alive
	application::InstanceArray aliveInstances;
	aliveInstances.allocate(aliveInstancesCount);

	int j = 0;
	for (int i = 0; i < response.applicationinfo_size(); ++i) {

		if (instances.m_array[i].get() != 0) {
legoc's avatar
legoc committed
246
			aliveInstances[j] = std::move(instances.m_array[i]);
legoc's avatar
legoc committed
247
248
249
250
251
252
253
			j++;
		}
	}

	return aliveInstances;
}

legoc's avatar
legoc committed
254
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
legoc's avatar
legoc committed
255

legoc's avatar
legoc committed
256
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
257
258

	if (instances.size() == 0) {
legoc's avatar
legoc committed
259
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
260
261
262
		return instance;
	}

legoc's avatar
legoc committed
263
	return std::move(instances[0]);
legoc's avatar
legoc committed
264
265
266
267
268
269
270
271
272
273
274
275
276
277
}

void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

legoc's avatar
legoc committed
278
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ISALIVE), m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
279
280
281
282
283
284
285
286
287
288
289

	proto::IsAliveResponse isAliveResponse;
	isAliveResponse.ParseFromArray((*reply).data(), (*reply).size());

	return isAliveResponse.isalive();
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

	vector<application::Configuration> configVector;

legoc's avatar
legoc committed
290
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ALLAVAILABLE), m_impl->createAllAvailableRequest());
legoc's avatar
legoc committed
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313

	proto::AllAvailableResponse allAvailableResponse;
	allAvailableResponse.ParseFromArray((*reply).data(), (*reply).size());

	for (int i = 0; i < allAvailableResponse.applicationconfig_size(); ++i) {
		proto::ApplicationConfig config = allAvailableResponse.applicationconfig(i);

		application::Configuration applicationConfig(config.name(),
				config.description(),
				config.runssingle(),
				config.restart(),
				config.startingtime(),
				config.retries(),
				config.stoppingtime());

		configVector.push_back(applicationConfig);
	}

	return configVector;
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
314
	vector<application::Info> infos;
legoc's avatar
legoc committed
315

legoc's avatar
legoc committed
316
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_SHOWALL), m_impl->createShowAllRequest());
legoc's avatar
legoc committed
317
318
319
320
321
322
323
324
325

	proto::ApplicationInfoListResponse response;
	response.ParseFromArray((*reply).data(), (*reply).size());

	for (int i = 0; i < response.applicationinfo_size(); ++i) {
		proto::ApplicationInfo info = response.applicationinfo(i);

		application::Info applicationInfo(info.name(),
						info.id(),
legoc's avatar
legoc committed
326
						info.pid(),
legoc's avatar
legoc committed
327
328
329
330
						info.applicationstate(),
						info.pastapplicationstates(),
						info.args());

legoc's avatar
legoc committed
331
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
332
333
	}

legoc's avatar
legoc committed
334
	return infos;
legoc's avatar
legoc committed
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

	vector<application::Info> allInfoVector = getApplicationInfos();
	vector<application::Info> infoVector;

	for (vector<application::Info>::const_iterator i = allInfoVector.begin(); i != allInfoVector.end(); ++i) {
		application::Info const & info = *i;
		if (info.getName() == name) {
			infoVector.push_back(info);
		}
	}

	return infoVector;
}

legoc's avatar
legoc committed
352
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
353
354
355
	return Services::openEventStream();
}

356
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
357

legoc's avatar
legoc committed
358
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECTPUBLISHER), m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
359
360
361
362
363
364
365
366
367
368
369
370

	proto::PublisherResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	int publisherPort = requestResponse.publisherport();
	if (publisherPort == -1) {
		throw SubscriberCreationException(requestResponse.message());
	}

	int synchronizerPort = requestResponse.synchronizerport();
	int numberOfSubscribers = requestResponse.numberofsubscribers();

legoc's avatar
legoc committed
371
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint, m_serverStatusEndpoint));
legoc's avatar
legoc committed
372
373
374
375
376
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
377
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
378

legoc's avatar
legoc committed
379
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
380
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
381

382
	return connectionChecker;
legoc's avatar
legoc committed
383
384
}

385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
407
408
409
410
411
412
413
414
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

	os << "server@" << server.m_serverEndpoint;

	return os;
}

}