Server.cpp 16 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "Server.h"
#include "Application.h"
19
#include "ConnectionChecker.h"
20
#include "impl/ServicesImpl.h"
21
#include "EventThread.h"
22
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
23
#include "impl/RequestSocketImpl.h"
24
#include "message/Message.h"
25
26
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
27
28
#include <iostream>
#include <sstream>
29
#include "JSON.h"
legoc's avatar
legoc committed
30
31
32
33
34

using namespace std;

namespace cameo {

legoc's avatar
legoc committed
35
Server::Server(const std::string& endpoint, int timeoutMs) :
36
	Services() {
legoc's avatar
legoc committed
37

38
	Services::init();
legoc's avatar
legoc committed
39
40
41
42

	vector<string> tokens = split(endpoint);

	if (tokens.size() < 3) {
43
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
legoc's avatar
legoc committed
44
45
46
47
48
49
50
	}

	m_url = tokens[0] + ":" + tokens[1];
	string port = tokens[2];
	istringstream is(port);
	is >> m_port;
	m_serverEndpoint = m_url + ":" + port;
51

legoc's avatar
legoc committed
52
53
54
	// Set the timeout.
	Services::setTimeout(timeoutMs);

55
56
57
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

58
59
60
	// Retrieve the server version.
	Services::retrieveServerVersion();

legoc's avatar
legoc committed
61
62
63
64
65
66
67
68
69
70
	// Manage the ConnectionTimeout exception that can occur.
	try {
		// Start the event thread.
		unique_ptr<EventStreamSocket> socket = openEventStream();
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (...) {
		// ...
	}
legoc's avatar
legoc committed
71
72
73
}

Server::~Server() {
74
75
76
77
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
78
79
}

legoc's avatar
legoc committed
80
void Server::setTimeout(int timeoutMs) {
81
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

const std::string& Server::getEndpoint() const {
	return Services::getEndpoint();
}

const std::string& Server::getUrl() const {
	return Services::getUrl();
}

legoc's avatar
legoc committed
96
97
98
99
std::array<int, 3> Server::getVersion() const {
	return Services::getVersion();
}

legoc's avatar
legoc committed
100
101
102
103
104
105
106
107
int Server::getPort() const {
	return Services::getPort();
}

bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
108
bool Server::isAvailable() const {
109
110
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
111

112
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
113
114
	int timeout = getTimeout();
	if (timeout > 0) {
115
116
117
118
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
119
120
121
	}
}

legoc's avatar
legoc committed
122
std::unique_ptr<application::Instance> Server::makeInstance() {
123
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
124
125
}

legoc's avatar
legoc committed
126
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
legoc's avatar
legoc committed
127
128
129
	return start(name, vector<string>(), options);
}

legoc's avatar
legoc committed
130
131
int Server::getStreamPort(const std::string& name) {

132
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createOutputPortRequest(name));
legoc's avatar
legoc committed
133

134
135
136
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
137

138
	return response[message::RequestResponse::VALUE].GetInt();
legoc's avatar
legoc committed
139
140
}

legoc's avatar
legoc committed
141
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
legoc's avatar
legoc committed
142

legoc's avatar
legoc committed
143
144
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
145
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
146

147
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
148
	instance->setName(name);
149
	registerEventListener(instance.get());
legoc's avatar
legoc committed
150
151

	try {
legoc's avatar
legoc committed
152
		if (outputStream) {
legoc's avatar
legoc committed
153
154
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
155
156
157
158
			unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
			instance->setOutputStreamSocket(socket);
		}

159
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
legoc's avatar
legoc committed
160

161
162
163
		// Get the JSON response.
		json::Object response;
		json::parse(response, reply.get());
legoc's avatar
legoc committed
164

165
166
167
		int value = response[message::RequestResponse::VALUE].GetInt();
		if (value == -1) {
			instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
168
		}
169
170
171
172
173
		else {
			instance->setId(value);
		}
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
174
175
176
177
178
179
180
181
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

182
	string request;
legoc's avatar
legoc committed
183
184

	if (immediately) {
185
186
187
188
		request = m_impl->createKillRequest(id);
	}
	else {
		request = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
189
190
	}

191
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(request);
legoc's avatar
legoc committed
192

193
194
195
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
196

197
198
199
200
	int value = response[message::RequestResponse::VALUE].GetInt();
	string message = response[message::RequestResponse::MESSAGE].GetString();

	return Response(value, message);
legoc's avatar
legoc committed
201
202
}

legoc's avatar
legoc committed
203
204
205
application::InstanceArray Server::connectAll(const std::string& name, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
206
207
208

	application::InstanceArray instances;

209
210
211
212
213
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
214

215
216
217
	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
218

219
220
	// Allocate the array.
	instances.allocate(size);
legoc's avatar
legoc committed
221
222
223

	int aliveInstancesCount = 0;

224
225
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
226

legoc's avatar
legoc committed
227
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
228

229
		// Set the name and register the instance as event listener.
230
231
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
232
233
		registerEventListener(instance.get());

234
		int applicationId = info[message::ApplicationInfo::ID].GetInt();
legoc's avatar
legoc committed
235
236
237
238
239

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

240
241
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
242
			if (outputStream) {
243
				unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
legoc's avatar
legoc committed
244
245
246
				instance->setOutputStreamSocket(socket);
			}

legoc's avatar
legoc committed
247
			instance->setId(applicationId);
248
249
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
legoc's avatar
legoc committed
250

legoc's avatar
legoc committed
251
			instances.m_array[i] = std::move(instance);
legoc's avatar
legoc committed
252
253
254
		}
	}

255
	// Copy the alive instances.
legoc's avatar
legoc committed
256
257
258
259
	application::InstanceArray aliveInstances;
	aliveInstances.allocate(aliveInstancesCount);

	int j = 0;
260
	for (int i = 0; i < size; ++i) {
legoc's avatar
legoc committed
261
262

		if (instances.m_array[i].get() != 0) {
legoc's avatar
legoc committed
263
			aliveInstances[j] = std::move(instances.m_array[i]);
legoc's avatar
legoc committed
264
265
266
267
268
269
270
			j++;
		}
	}

	return aliveInstances;
}

legoc's avatar
legoc committed
271
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
legoc's avatar
legoc committed
272

legoc's avatar
legoc committed
273
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
274
275

	if (instances.size() == 0) {
legoc's avatar
legoc committed
276
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
277
278
279
		return instance;
	}

legoc's avatar
legoc committed
280
	return std::move(instances[0]);
legoc's avatar
legoc committed
281
282
283
284
285
286
287
288
289
290
291
292
293
294
}

void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

295
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
296

297
298
299
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
300

301
	return response[message::IsAliveResponse::IS_ALIVE].GetBool();
legoc's avatar
legoc committed
302
303
304
305
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

306
307
	vector<application::Configuration> configs;

308
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createListRequest());
legoc's avatar
legoc committed
309

310
311
312
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
313

legoc's avatar
legoc committed
314
	json::Value& applicationConfigs = response[message::ListResponse::APPLICATION_CONFIG];
315
316
	json::Value::Array array = applicationConfigs.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
317

318
319
	for (int i = 0; i < size; ++i) {
		json::Value::Object config = array[i].GetObject();
legoc's avatar
legoc committed
320

321
322
323
324
325
326
		string name = config[message::ApplicationConfig::NAME].GetString();
		string description = config[message::ApplicationConfig::DESCRIPTION].GetString();
		bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool();
		bool restart = config[message::ApplicationConfig::RESTART].GetBool();
		int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt();
		int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt();
legoc's avatar
legoc committed
327

328
329
330
331
332
333
334
335
		application::Configuration applicationConfig(name,
				description,
				runsSingle,
				restart,
				startingTime,
				stoppingTime);

		configs.push_back(applicationConfig);
legoc's avatar
legoc committed
336
337
	}

338
	return configs;
legoc's avatar
legoc committed
339
340
341
342
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
343
	vector<application::Info> infos;
legoc's avatar
legoc committed
344

legoc's avatar
legoc committed
345
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createAppsRequest());
346
347
348
349
350
351
352
353

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
354

355
356
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
357

358
359
360
361
362
363
		string name = info[message::ApplicationInfo::NAME].GetString();
		int id = info[message::ApplicationInfo::ID].GetInt();
		int pid = info[message::ApplicationInfo::PID].GetInt();
		application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt();
		application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt();
		string args = info[message::ApplicationInfo::ARGS].GetString();
legoc's avatar
legoc committed
364

365
366
367
368
369
370
		application::Info applicationInfo(name,
						id,
						pid,
						state,
						pastStates,
						args);
legoc's avatar
legoc committed
371

legoc's avatar
legoc committed
372
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
373
374
	}

legoc's avatar
legoc committed
375
	return infos;
legoc's avatar
legoc committed
376
377
378
379
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

380
381
	vector<application::Info> allInfos = getApplicationInfos();
	vector<application::Info> infos;
legoc's avatar
legoc committed
382

383
	for (vector<application::Info>::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) {
legoc's avatar
legoc committed
384
385
		application::Info const & info = *i;
		if (info.getName() == name) {
386
			infos.push_back(info);
legoc's avatar
legoc committed
387
388
389
		}
	}

390
	return infos;
legoc's avatar
legoc committed
391
392
}

393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
application::State Server::getActualState(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	return response[message::StatusEvent::APPLICATION_STATE].GetInt();
}

std::set<application::State> Server::getPastStates(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	application::State applicationStates = response[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();

	set<application::State> result;

	if ((applicationStates & application::STARTING) != 0) {
		result.insert(application::STARTING);
	}

	if ((applicationStates & application::RUNNING) != 0) {
		result.insert(application::RUNNING);
	}

	if ((applicationStates & application::STOPPING) != 0) {
		result.insert(application::STOPPING);
	}

	if ((applicationStates & application::KILLING) != 0) {
		result.insert(application::KILLING);
	}

	if ((applicationStates & application::PROCESSING_ERROR) != 0) {
		result.insert(application::PROCESSING_ERROR);
	}

	if ((applicationStates & application::FAILURE) != 0) {
		result.insert(application::FAILURE);
	}

	if ((applicationStates & application::SUCCESS) != 0) {
		result.insert(application::SUCCESS);
	}

	if ((applicationStates & application::STOPPED) != 0) {
		result.insert(application::STOPPED);
	}

	if ((applicationStates & application::KILLED) != 0) {
		result.insert(application::KILLED);
	}

	return result;
}

legoc's avatar
legoc committed
455
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
456
457
458
	return Services::openEventStream();
}

459
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
460

461
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
462

463
464
465
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
466

467
	int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
legoc's avatar
legoc committed
468
	if (publisherPort == -1) {
469
		throw SubscriberCreationException(response[message::PublisherResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
470
471
	}

472
473
	int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
	int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
legoc's avatar
legoc committed
474

legoc's avatar
legoc committed
475
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint, m_serverStatusEndpoint));
legoc's avatar
legoc committed
476
477
478
479
480
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
481
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
482

legoc's avatar
legoc committed
483
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
484
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
485

486
	return connectionChecker;
legoc's avatar
legoc committed
487
488
}

489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
void Server::storeKeyValue(int id, const std::string& key, const std::string& value) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStoreKeyValueRequest(id, key, value));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
}

std::string Server::getKeyValue(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetKeyValueRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == 0) {
		return response[message::RequestResponse::MESSAGE].GetString();
	}
	else if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return "";
}

void Server::removeKey(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRemoveKeyRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
559
560
561
562
563
564
565
566
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

	os << "server@" << server.m_serverEndpoint;

	return os;
}

}