Server.cpp 19.2 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "Server.h"
#include "Application.h"
19
#include "ConnectionChecker.h"
20
#include "impl/ServicesImpl.h"
21
#include "EventThread.h"
22
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
23
#include "impl/RequestSocketImpl.h"
24
#include "message/Message.h"
25
26
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
27
28
#include <iostream>
#include <sstream>
29
#include "JSON.h"
legoc's avatar
legoc committed
30
31
32
33
34

using namespace std;

namespace cameo {

35
void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
legoc's avatar
legoc committed
36

37
	Services::init();
legoc's avatar
legoc committed
38

39
40
41
	m_serverEndpoint = endpoint;
	m_url = endpoint.getProtocol() + "://" + endpoint.getAddress();
	m_port = endpoint.getPort();
42

legoc's avatar
legoc committed
43
44
45
	// Set the timeout.
	Services::setTimeout(timeoutMs);

46
47
48
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

49
50
51
	// Retrieve the server version.
	Services::retrieveServerVersion();

legoc's avatar
legoc committed
52
53
54
55
56
57
58
59
60
61
	// Manage the ConnectionTimeout exception that can occur.
	try {
		// Start the event thread.
		unique_ptr<EventStreamSocket> socket = openEventStream();
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (...) {
		// ...
	}
legoc's avatar
legoc committed
62
63
}

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
Server::Server(const Endpoint& endpoint, int timeoutMs) :
	Services() {

	Services::init();

	initServer(endpoint, timeoutMs);
}

Server::Server(const std::string& endpoint, int timeoutMs) :
	Services() {

	Services::init();

	try {
		initServer(Endpoint::parse(endpoint), timeoutMs);
	}
	catch (...) {
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
	}
}

legoc's avatar
legoc committed
85
Server::~Server() {
86
87
88
89
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
90
91
}

legoc's avatar
legoc committed
92
void Server::setTimeout(int timeoutMs) {
93
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
94
95
96
97
98
99
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

100
const Endpoint& Server::getEndpoint() const {
legoc's avatar
legoc committed
101
102
103
104
105
106
107
	return Services::getEndpoint();
}

const std::string& Server::getUrl() const {
	return Services::getUrl();
}

legoc's avatar
legoc committed
108
109
110
111
std::array<int, 3> Server::getVersion() const {
	return Services::getVersion();
}

legoc's avatar
legoc committed
112
113
114
115
116
117
118
119
int Server::getPort() const {
	return Services::getPort();
}

bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
120
bool Server::isAvailable() const {
121
122
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
123

124
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
125
126
	int timeout = getTimeout();
	if (timeout > 0) {
127
128
129
130
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
131
132
133
	}
}

legoc's avatar
legoc committed
134
std::unique_ptr<application::Instance> Server::makeInstance() {
135
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
136
137
}

legoc's avatar
legoc committed
138
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
legoc's avatar
legoc committed
139
140
141
	return start(name, vector<string>(), options);
}

legoc's avatar
legoc committed
142
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
legoc's avatar
legoc committed
143

legoc's avatar
legoc committed
144
145
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
146
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
147

148
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
149
	instance->setName(name);
150
	registerEventListener(instance.get());
legoc's avatar
legoc committed
151
152

	try {
153
154
		unique_ptr<OutputStreamSocket> streamSocket;

legoc's avatar
legoc committed
155
		if (outputStream) {
legoc's avatar
legoc committed
156
157
			// Connect to the stream port. A sync is made to ensure that the subscriber is connected.
			streamSocket = createOutputStreamSocket(name);
legoc's avatar
legoc committed
158
159
		}

160
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
legoc's avatar
legoc committed
161

162
163
164
		// Get the JSON response.
		json::Object response;
		json::parse(response, reply.get());
legoc's avatar
legoc committed
165

166
167
168
		int value = response[message::RequestResponse::VALUE].GetInt();
		if (value == -1) {
			instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
169
		}
170
171
		else {
			instance->setId(value);
172
173
174
175

			if (outputStream) {
				instance->setOutputStreamSocket(streamSocket);
			}
176
177
178
		}
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
179
180
181
182
183
184
185
186
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

187
	string request;
legoc's avatar
legoc committed
188
189

	if (immediately) {
190
191
192
193
		request = m_impl->createKillRequest(id);
	}
	else {
		request = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
194
195
	}

196
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(request);
legoc's avatar
legoc committed
197

198
199
200
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
201

202
203
204
205
	int value = response[message::RequestResponse::VALUE].GetInt();
	string message = response[message::RequestResponse::MESSAGE].GetString();

	return Response(value, message);
legoc's avatar
legoc committed
206
207
}

legoc's avatar
legoc committed
208
209
210
application::InstanceArray Server::connectAll(const std::string& name, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
211

212
213
214
215
216
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
217

218
219
	application::InstanceArray instances;

220
221
222
	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
223

224
225
	// Allocate the array.
	instances.allocate(size);
legoc's avatar
legoc committed
226
227
228

	int aliveInstancesCount = 0;

229
230
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
231

legoc's avatar
legoc committed
232
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
233

234
		// Set the name and register the instance as event listener.
235
236
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
237
238
		registerEventListener(instance.get());

239
		int applicationId = info[message::ApplicationInfo::ID].GetInt();
legoc's avatar
legoc committed
240
241
242
243
244
245

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

			instance->setId(applicationId);
246
247
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
legoc's avatar
legoc committed
248

249
			if (outputStream) {
legoc's avatar
legoc committed
250
				unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
251
252
253
				instance->setOutputStreamSocket(streamSocket);
			}

legoc's avatar
legoc committed
254
			instances.m_array[i] = std::move(instance);
legoc's avatar
legoc committed
255
256
257
		}
	}

258
	// Copy the alive instances.
legoc's avatar
legoc committed
259
260
261
262
	application::InstanceArray aliveInstances;
	aliveInstances.allocate(aliveInstancesCount);

	int j = 0;
263
	for (int i = 0; i < size; ++i) {
legoc's avatar
legoc committed
264
265

		if (instances.m_array[i].get() != 0) {
legoc's avatar
legoc committed
266
			aliveInstances[j] = std::move(instances.m_array[i]);
legoc's avatar
legoc committed
267
268
269
270
271
272
273
			j++;
		}
	}

	return aliveInstances;
}

legoc's avatar
legoc committed
274
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
legoc's avatar
legoc committed
275

legoc's avatar
legoc committed
276
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
277
278

	if (instances.size() == 0) {
legoc's avatar
legoc committed
279
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
280
281
282
		return instance;
	}

legoc's avatar
legoc committed
283
	return std::move(instances[0]);
legoc's avatar
legoc committed
284
285
}

286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
std::unique_ptr<application::Instance> Server::connect(int id, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectWithIdRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();

	if (size > 0) {
		json::Value::Object info = array[0].GetObject();

		unique_ptr<application::Instance> instance = makeInstance();

		// Set the name and register the instance as event listener.
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
		registerEventListener(instance.get());

		int applicationId = info[message::ApplicationInfo::ID].GetInt();

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {

			instance->setId(applicationId);
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());

			if (outputStream) {
legoc's avatar
legoc committed
320
				unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
321
322
323
324
325
326
327
328
329
330
				instance->setOutputStreamSocket(streamSocket);
			}

			return instance;
		}
	}

	return makeInstance();
}

legoc's avatar
legoc committed
331
332
333
334
335
336
337
338
339
340
341
342
void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

343
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
344

345
346
347
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
348

349
	return response[message::IsAliveResponse::IS_ALIVE].GetBool();
legoc's avatar
legoc committed
350
351
352
353
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

354
355
	vector<application::Configuration> configs;

356
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createListRequest());
legoc's avatar
legoc committed
357

358
359
360
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
361

362
	json::Value& applicationConfigs = response[message::ApplicationConfigListResponse::APPLICATION_CONFIG];
363
364
	json::Value::Array array = applicationConfigs.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
365

366
367
	for (int i = 0; i < size; ++i) {
		json::Value::Object config = array[i].GetObject();
legoc's avatar
legoc committed
368

369
370
371
372
373
374
		string name = config[message::ApplicationConfig::NAME].GetString();
		string description = config[message::ApplicationConfig::DESCRIPTION].GetString();
		bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool();
		bool restart = config[message::ApplicationConfig::RESTART].GetBool();
		int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt();
		int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt();
legoc's avatar
legoc committed
375

376
377
378
379
380
381
382
383
		application::Configuration applicationConfig(name,
				description,
				runsSingle,
				restart,
				startingTime,
				stoppingTime);

		configs.push_back(applicationConfig);
legoc's avatar
legoc committed
384
385
	}

386
	return configs;
legoc's avatar
legoc committed
387
388
389
390
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
391
	vector<application::Info> infos;
legoc's avatar
legoc committed
392

legoc's avatar
legoc committed
393
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createAppsRequest());
394
395
396
397
398

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

399
400
	json::Value& applicationInfos = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfos.GetArray();
401
	size_t size = array.Size();
legoc's avatar
legoc committed
402

403
404
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
405

406
407
408
409
410
411
		string name = info[message::ApplicationInfo::NAME].GetString();
		int id = info[message::ApplicationInfo::ID].GetInt();
		int pid = info[message::ApplicationInfo::PID].GetInt();
		application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt();
		application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt();
		string args = info[message::ApplicationInfo::ARGS].GetString();
legoc's avatar
legoc committed
412

413
414
415
416
417
418
		application::Info applicationInfo(name,
						id,
						pid,
						state,
						pastStates,
						args);
legoc's avatar
legoc committed
419

legoc's avatar
legoc committed
420
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
421
422
	}

legoc's avatar
legoc committed
423
	return infos;
legoc's avatar
legoc committed
424
425
426
427
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

428
429
	vector<application::Info> allInfos = getApplicationInfos();
	vector<application::Info> infos;
legoc's avatar
legoc committed
430

431
	for (vector<application::Info>::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) {
legoc's avatar
legoc committed
432
433
		application::Info const & info = *i;
		if (info.getName() == name) {
434
			infos.push_back(info);
legoc's avatar
legoc committed
435
436
437
		}
	}

438
	return infos;
legoc's avatar
legoc committed
439
440
}

legoc's avatar
legoc committed
441
std::vector<application::Port> Server::getPorts() const {
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459

	vector<application::Port> ports;

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createPortsRequest());

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& portInfos = response[message::PortInfoListResponse::PORT_INFO];
	json::Value::Array array = portInfos.GetArray();
	size_t size = array.Size();

	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();

		int port = info[message::PortInfo::PORT].GetInt();
		string status = info[message::PortInfo::STATUS].GetString();
legoc's avatar
legoc committed
460
		string owner = info[message::PortInfo::OWNER].GetString();
461

legoc's avatar
legoc committed
462
		application::Port portInfo(port, status, owner);
463
464
465
466
467
468
469

		ports.push_back(portInfo);
	}

	return ports;
}

470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
application::State Server::getActualState(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	return response[message::StatusEvent::APPLICATION_STATE].GetInt();
}

std::set<application::State> Server::getPastStates(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	application::State applicationStates = response[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();

	set<application::State> result;

	if ((applicationStates & application::STARTING) != 0) {
		result.insert(application::STARTING);
	}

	if ((applicationStates & application::RUNNING) != 0) {
		result.insert(application::RUNNING);
	}

	if ((applicationStates & application::STOPPING) != 0) {
		result.insert(application::STOPPING);
	}

	if ((applicationStates & application::KILLING) != 0) {
		result.insert(application::KILLING);
	}

	if ((applicationStates & application::PROCESSING_ERROR) != 0) {
		result.insert(application::PROCESSING_ERROR);
	}

	if ((applicationStates & application::FAILURE) != 0) {
		result.insert(application::FAILURE);
	}

	if ((applicationStates & application::SUCCESS) != 0) {
		result.insert(application::SUCCESS);
	}

	if ((applicationStates & application::STOPPED) != 0) {
		result.insert(application::STOPPED);
	}

	if ((applicationStates & application::KILLED) != 0) {
		result.insert(application::KILLED);
	}

	return result;
}

legoc's avatar
legoc committed
532
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
533
534
535
	return Services::openEventStream();
}

536
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
537

538
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
539

540
541
542
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
543

544
	int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
legoc's avatar
legoc committed
545
	if (publisherPort == -1) {
546
		throw SubscriberCreationException(response[message::PublisherResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
547
548
	}

549
550
	int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
	int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
legoc's avatar
legoc committed
551

552
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint.toString(), m_serverStatusEndpoint));
legoc's avatar
legoc committed
553
554
555
556
557
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
558
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
559

legoc's avatar
legoc committed
560
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
561
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
562

563
	return connectionChecker;
legoc's avatar
legoc committed
564
565
}

566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
void Server::storeKeyValue(int id, const std::string& key, const std::string& value) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStoreKeyValueRequest(id, key, value));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
}

std::string Server::getKeyValue(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetKeyValueRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == 0) {
		return response[message::RequestResponse::MESSAGE].GetString();
	}
	else if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return "";
}

void Server::removeKey(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRemoveKeyRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
int Server::requestPort(int id) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestPortRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return value;
}

void Server::setPortUnavailable(int id, int port) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createPortUnavailableRequest(id, port));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

void Server::releasePort(int id, int port) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createReleasePortRequest(id, port));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
680
681
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

682
	os << "server@" << server.m_serverEndpoint.toString();
legoc's avatar
legoc committed
683
684
685
686
687

	return os;
}

}