Server.cpp 19.1 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "Server.h"
#include "Application.h"
19
#include "ConnectionChecker.h"
20
#include "impl/ServicesImpl.h"
21
#include "EventThread.h"
22
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
23
#include "impl/RequestSocketImpl.h"
24
#include "message/Message.h"
25
26
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
27
28
#include <iostream>
#include <sstream>
29
#include "JSON.h"
legoc's avatar
legoc committed
30
31
32
33
34

using namespace std;

namespace cameo {

35
void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
legoc's avatar
legoc committed
36

37
	Services::init();
legoc's avatar
legoc committed
38

39
	m_serverEndpoint = endpoint;
40

legoc's avatar
legoc committed
41
42
43
	// Set the timeout.
	Services::setTimeout(timeoutMs);

44
45
46
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

legoc's avatar
legoc committed
47
48
	// Manage the ConnectionTimeout exception that can occur.
	try {
49
50
51
		// Retrieve the server version.
		Services::retrieveServerVersion();

legoc's avatar
legoc committed
52
53
54
55
56
57
58
59
		// Start the event thread.
		unique_ptr<EventStreamSocket> socket = openEventStream();
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (...) {
		// ...
	}
legoc's avatar
legoc committed
60
61
}

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
Server::Server(const Endpoint& endpoint, int timeoutMs) :
	Services() {

	Services::init();

	initServer(endpoint, timeoutMs);
}

Server::Server(const std::string& endpoint, int timeoutMs) :
	Services() {

	Services::init();

	try {
		initServer(Endpoint::parse(endpoint), timeoutMs);
	}
	catch (...) {
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
	}
}

legoc's avatar
legoc committed
83
Server::~Server() {
84
85
86
87
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
88
89
}

legoc's avatar
legoc committed
90
void Server::setTimeout(int timeoutMs) {
91
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
92
93
94
95
96
97
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

98
const Endpoint& Server::getEndpoint() const {
legoc's avatar
legoc committed
99
100
101
	return Services::getEndpoint();
}

legoc's avatar
legoc committed
102
103
104
105
std::array<int, 3> Server::getVersion() const {
	return Services::getVersion();
}

legoc's avatar
legoc committed
106
107
108
109
bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
110
bool Server::isAvailable() const {
111
112
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
113

114
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
115
116
	int timeout = getTimeout();
	if (timeout > 0) {
117
118
119
120
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
121
122
123
	}
}

legoc's avatar
legoc committed
124
std::unique_ptr<application::Instance> Server::makeInstance() {
125
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
126
127
}

128
std::unique_ptr<application::Instance> Server::start(const std::string& name, int options) {
legoc's avatar
legoc committed
129
130
131
	return start(name, vector<string>(), options);
}

132
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, int options) {
legoc's avatar
legoc committed
133

legoc's avatar
legoc committed
134
135
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
136
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
137

138
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
139
	instance->setName(name);
140
	registerEventListener(instance.get());
legoc's avatar
legoc committed
141
142

	try {
143
144
		unique_ptr<OutputStreamSocket> streamSocket;

legoc's avatar
legoc committed
145
		if (outputStream) {
legoc's avatar
legoc committed
146
147
			// Connect to the stream port. A sync is made to ensure that the subscriber is connected.
			streamSocket = createOutputStreamSocket(name);
legoc's avatar
legoc committed
148
149
		}

legoc's avatar
legoc committed
150
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getName(), application::This::getId(), application::This::getEndpoint().toString()));
legoc's avatar
legoc committed
151

152
153
154
		// Get the JSON response.
		json::Object response;
		json::parse(response, reply.get());
legoc's avatar
legoc committed
155

156
157
158
		int value = response[message::RequestResponse::VALUE].GetInt();
		if (value == -1) {
			instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
159
		}
160
161
		else {
			instance->setId(value);
162
163
164
165

			if (outputStream) {
				instance->setOutputStreamSocket(streamSocket);
			}
166
167
168
		}
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
169
170
171
172
173
174
175
176
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

177
	string request;
legoc's avatar
legoc committed
178
179

	if (immediately) {
180
181
182
183
		request = m_impl->createKillRequest(id);
	}
	else {
		request = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
184
185
	}

186
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(request);
legoc's avatar
legoc committed
187

188
189
190
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
191

192
193
194
195
	int value = response[message::RequestResponse::VALUE].GetInt();
	string message = response[message::RequestResponse::MESSAGE].GetString();

	return Response(value, message);
legoc's avatar
legoc committed
196
197
}

198
application::InstanceArray Server::connectAll(const std::string& name, int options) {
legoc's avatar
legoc committed
199
200

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
201

202
203
204
205
206
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
207

208
209
	application::InstanceArray instances;

210
211
212
	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
213

214
	// Allocate the array.
215
	instances.reserve(size);
legoc's avatar
legoc committed
216
217
218

	int aliveInstancesCount = 0;

219
220
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
221

legoc's avatar
legoc committed
222
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
223

224
		// Set the name and register the instance as event listener.
225
226
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
227
228
		registerEventListener(instance.get());

229
		int applicationId = info[message::ApplicationInfo::ID].GetInt();
legoc's avatar
legoc committed
230
231
232
233
234
235

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

			instance->setId(applicationId);
236
237
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
legoc's avatar
legoc committed
238

239
			if (outputStream) {
legoc's avatar
legoc committed
240
				unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
241
242
243
				instance->setOutputStreamSocket(streamSocket);
			}

244
			instances.push_back(std::move(instance));
legoc's avatar
legoc committed
245
246
247
		}
	}

248
	// Copy the alive instances.
legoc's avatar
legoc committed
249
	application::InstanceArray aliveInstances;
250
	aliveInstances.reserve(aliveInstancesCount);
legoc's avatar
legoc committed
251
252

	int j = 0;
253
	for (int i = 0; i < size; ++i) {
legoc's avatar
legoc committed
254

255
256
		if (instances[i].get() != nullptr) {
			aliveInstances.push_back(std::move(instances[i]));
legoc's avatar
legoc committed
257
258
259
260
261
262
263
			j++;
		}
	}

	return aliveInstances;
}

264
std::unique_ptr<application::Instance> Server::connect(const std::string& name, int options) {
legoc's avatar
legoc committed
265

legoc's avatar
legoc committed
266
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
267
268

	if (instances.size() == 0) {
legoc's avatar
legoc committed
269
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
270
271
272
		return instance;
	}

legoc's avatar
legoc committed
273
	return std::move(instances[0]);
legoc's avatar
legoc committed
274
275
}

276
std::unique_ptr<application::Instance> Server::connect(int id, int options) {
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309

	bool outputStream = ((options & OUTPUTSTREAM) != 0);

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectWithIdRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();

	if (size > 0) {
		json::Value::Object info = array[0].GetObject();

		unique_ptr<application::Instance> instance = makeInstance();

		// Set the name and register the instance as event listener.
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
		registerEventListener(instance.get());

		int applicationId = info[message::ApplicationInfo::ID].GetInt();

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {

			instance->setId(applicationId);
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());

			if (outputStream) {
legoc's avatar
legoc committed
310
				unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
311
312
313
314
315
316
317
318
319
320
				instance->setOutputStreamSocket(streamSocket);
			}

			return instance;
		}
	}

	return makeInstance();
}

legoc's avatar
legoc committed
321
322
323
324
325
326
327
328
329
330
331
332
void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

333
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
334

335
336
337
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
338

339
	return response[message::IsAliveResponse::IS_ALIVE].GetBool();
legoc's avatar
legoc committed
340
341
342
343
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

344
345
	vector<application::Configuration> configs;

346
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createListRequest());
legoc's avatar
legoc committed
347

348
349
350
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
351

352
	json::Value& applicationConfigs = response[message::ApplicationConfigListResponse::APPLICATION_CONFIG];
353
354
	json::Value::Array array = applicationConfigs.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
355

356
357
	for (int i = 0; i < size; ++i) {
		json::Value::Object config = array[i].GetObject();
legoc's avatar
legoc committed
358

359
360
361
362
363
364
		string name = config[message::ApplicationConfig::NAME].GetString();
		string description = config[message::ApplicationConfig::DESCRIPTION].GetString();
		bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool();
		bool restart = config[message::ApplicationConfig::RESTART].GetBool();
		int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt();
		int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt();
legoc's avatar
legoc committed
365

366
367
368
369
370
371
372
373
		application::Configuration applicationConfig(name,
				description,
				runsSingle,
				restart,
				startingTime,
				stoppingTime);

		configs.push_back(applicationConfig);
legoc's avatar
legoc committed
374
375
	}

376
	return configs;
legoc's avatar
legoc committed
377
378
379
380
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
381
	vector<application::Info> infos;
legoc's avatar
legoc committed
382

legoc's avatar
legoc committed
383
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createAppsRequest());
384
385
386
387
388

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

389
390
	json::Value& applicationInfos = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfos.GetArray();
391
	size_t size = array.Size();
legoc's avatar
legoc committed
392

393
394
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
395

396
397
398
399
400
401
		string name = info[message::ApplicationInfo::NAME].GetString();
		int id = info[message::ApplicationInfo::ID].GetInt();
		int pid = info[message::ApplicationInfo::PID].GetInt();
		application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt();
		application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt();
		string args = info[message::ApplicationInfo::ARGS].GetString();
legoc's avatar
legoc committed
402

403
404
405
406
407
408
		application::Info applicationInfo(name,
						id,
						pid,
						state,
						pastStates,
						args);
legoc's avatar
legoc committed
409

legoc's avatar
legoc committed
410
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
411
412
	}

legoc's avatar
legoc committed
413
	return infos;
legoc's avatar
legoc committed
414
415
416
417
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

418
419
	vector<application::Info> allInfos = getApplicationInfos();
	vector<application::Info> infos;
legoc's avatar
legoc committed
420

421
	for (vector<application::Info>::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) {
legoc's avatar
legoc committed
422
423
		application::Info const & info = *i;
		if (info.getName() == name) {
424
			infos.push_back(info);
legoc's avatar
legoc committed
425
426
427
		}
	}

428
	return infos;
legoc's avatar
legoc committed
429
430
}

legoc's avatar
legoc committed
431
std::vector<application::Port> Server::getPorts() const {
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449

	vector<application::Port> ports;

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createPortsRequest());

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& portInfos = response[message::PortInfoListResponse::PORT_INFO];
	json::Value::Array array = portInfos.GetArray();
	size_t size = array.Size();

	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();

		int port = info[message::PortInfo::PORT].GetInt();
		string status = info[message::PortInfo::STATUS].GetString();
legoc's avatar
legoc committed
450
		string owner = info[message::PortInfo::OWNER].GetString();
451

legoc's avatar
legoc committed
452
		application::Port portInfo(port, status, owner);
453
454
455
456
457
458
459

		ports.push_back(portInfo);
	}

	return ports;
}

460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
application::State Server::getActualState(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	return response[message::StatusEvent::APPLICATION_STATE].GetInt();
}

std::set<application::State> Server::getPastStates(int id) const {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetStatusRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	application::State applicationStates = response[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();

	set<application::State> result;

	if ((applicationStates & application::STARTING) != 0) {
		result.insert(application::STARTING);
	}

	if ((applicationStates & application::RUNNING) != 0) {
		result.insert(application::RUNNING);
	}

	if ((applicationStates & application::STOPPING) != 0) {
		result.insert(application::STOPPING);
	}

	if ((applicationStates & application::KILLING) != 0) {
		result.insert(application::KILLING);
	}

	if ((applicationStates & application::PROCESSING_ERROR) != 0) {
		result.insert(application::PROCESSING_ERROR);
	}

	if ((applicationStates & application::FAILURE) != 0) {
		result.insert(application::FAILURE);
	}

	if ((applicationStates & application::SUCCESS) != 0) {
		result.insert(application::SUCCESS);
	}

	if ((applicationStates & application::STOPPED) != 0) {
		result.insert(application::STOPPED);
	}

	if ((applicationStates & application::KILLED) != 0) {
		result.insert(application::KILLED);
	}

	return result;
}

legoc's avatar
legoc committed
522
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
523
524
525
	return Services::openEventStream();
}

526
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
527

528
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
529

530
531
532
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
533

534
	int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
legoc's avatar
legoc committed
535
	if (publisherPort == -1) {
536
		throw SubscriberCreationException(response[message::PublisherResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
537
538
	}

539
540
	int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
	int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
legoc's avatar
legoc committed
541

legoc's avatar
legoc committed
542
	// TODO simplify the use of some variables: e.g. m_serverEndpoint accessible from this.
543
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint.toString(), m_serverEndpoint.withPort(m_statusPort).toString()));
legoc's avatar
legoc committed
544
545
546
547
548
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
549
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
550

legoc's avatar
legoc committed
551
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
552
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
553

554
	return connectionChecker;
legoc's avatar
legoc committed
555
556
}

557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
void Server::storeKeyValue(int id, const std::string& key, const std::string& value) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStoreKeyValueRequest(id, key, value));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
}

std::string Server::getKeyValue(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createGetKeyValueRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == 0) {
		return response[message::RequestResponse::MESSAGE].GetString();
	}
	else if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return "";
}

void Server::removeKey(int id, const std::string& key) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRemoveKeyRequest(id, key));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
int Server::requestPort(int id) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestPortRequest(id));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return value;
}

void Server::setPortUnavailable(int id, int port) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createPortUnavailableRequest(id, port));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

void Server::releasePort(int id, int port) {

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createReleasePortRequest(id, port));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
671
672
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

673
	os << "server@" << server.m_serverEndpoint.toString();
legoc's avatar
legoc committed
674
675
676
677
678

	return os;
}

}