Server.cpp 19.9 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "Server.h"
legoc's avatar
legoc committed
18

19
#include "Application.h"
legoc's avatar
legoc committed
20
#include "CancelIdGenerator.h"
21
#include "ConnectionChecker.h"
22
23
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
legoc's avatar
legoc committed
24
25
#include "EventThread.h"
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
26
#include "impl/EventStreamSocketImpl.h"
legoc's avatar
legoc committed
27
#include "impl/zmq/ContextZmq.h"
legoc's avatar
legoc committed
28
#include "JSON.h"
legoc's avatar
legoc committed
29
#include "Messages.h"
30
#include "RequestSocket.h"
31
32
#include <iostream>
#include <sstream>
33
#include <stdexcept>
34

legoc's avatar
legoc committed
35
namespace cameo {
36
37
constexpr int defaultTimeout = 10000;
	
38
void Server::initServer(const Endpoint& endpoint, int timeoutMs) {
legoc's avatar
legoc committed
39

40
	initContext();
legoc's avatar
legoc committed
41

42
	m_serverEndpoint = endpoint;
43

legoc's avatar
legoc committed
44
	// Set the timeout.
45
	setTimeout(timeoutMs);
legoc's avatar
legoc committed
46

47
	// Create the request socket. The server endpoint has been defined.
48
	initRequestSocket();
49

legoc's avatar
legoc committed
50
51
	// Manage the ConnectionTimeout exception that can occur.
	try {
52
		// Retrieve the server version.
53
		retrieveServerVersion();
54

legoc's avatar
legoc committed
55
		// Start the event thread.
56
		std::unique_ptr<EventStreamSocket> socket = openEventStream();
legoc's avatar
legoc committed
57
58
59
60
61
62
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (...) {
		// ...
	}
legoc's avatar
legoc committed
63
64
}

65
Server::Server(const Endpoint& endpoint, int timeoutMs) :
66
67
	m_statusPort(0),
	m_contextImpl(nullptr) {
68

69
70
71
72
73
	m_serverVersion[0] = 0;
	m_serverVersion[1] = 0;
	m_serverVersion[2] = 0;

	initContext();
74
75
76
77
78

	initServer(endpoint, timeoutMs);
}

Server::Server(const std::string& endpoint, int timeoutMs) :
79
80
81
82
83
84
	m_statusPort(0),
	m_contextImpl(nullptr) {

	m_serverVersion[0] = 0;
	m_serverVersion[1] = 0;
	m_serverVersion[2] = 0;
85

86
87

	initContext();
88
89
90
91
92
93
94
95
96

	try {
		initServer(Endpoint::parse(endpoint), timeoutMs);
	}
	catch (...) {
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
	}
}

legoc's avatar
legoc committed
97
Server::~Server() {
98

99
100
101
102
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
103
104
105
106
107
108
109
	m_eventThread.reset();

	// Reset the request socket before the impl, otherwise reset context will block.
	m_requestSocket.reset();

	// Reset the context.
	m_contextImpl.reset();
legoc's avatar
legoc committed
110
111
}

112
113
114
115
116
117
118
void Server::setTimeout(int timeout) {

	m_contextImpl->setTimeout(timeout);

	if (m_requestSocket.get() != nullptr) {
		m_requestSocket->setTimeout(timeout);
	}
legoc's avatar
legoc committed
119
120
121
}

int Server::getTimeout() const {
122
	return m_contextImpl->getTimeout();
legoc's avatar
legoc committed
123
124
}

125
const Endpoint& Server::getEndpoint() const {
126
	return m_serverEndpoint;
legoc's avatar
legoc committed
127
128
}

legoc's avatar
legoc committed
129
std::array<int, 3> Server::getVersion() const {
130
	return m_serverVersion;
legoc's avatar
legoc committed
131
132
}

legoc's avatar
legoc committed
133
bool Server::isAvailable(int timeout) const {
134
	return m_contextImpl->isAvailable(m_requestSocket.get(), timeout);
legoc's avatar
legoc committed
135
136
}

legoc's avatar
legoc committed
137
bool Server::isAvailable() const {
138
139
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
140

141
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
142
143
	int timeout = getTimeout();
	if (timeout > 0) {
144
145
146
		return timeout;
	}
	else {
147
		return defaultTimeout;
legoc's avatar
legoc committed
148
149
150
	}
}

legoc's avatar
legoc committed
151
std::unique_ptr<application::Instance> Server::makeInstance() {
152
	return std::unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
153
154
}

155
std::unique_ptr<application::Instance> Server::start(const std::string& name, int options) {
156
	return start(name, std::vector<std::string>(), options);
legoc's avatar
legoc committed
157
158
}

159
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, int options) {
legoc's avatar
legoc committed
160

legoc's avatar
legoc committed
161
162
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

163
	std::unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
164

165
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
166
	instance->setName(name);
167
	registerEventListener(instance.get());
legoc's avatar
legoc committed
168
169

	try {
170
		std::unique_ptr<OutputStreamSocket> streamSocket;
171

legoc's avatar
legoc committed
172
		if (outputStream) {
legoc's avatar
legoc committed
173
174
			// Connect to the stream port. A sync is made to ensure that the subscriber is connected.
			streamSocket = createOutputStreamSocket(name);
legoc's avatar
legoc committed
175
176
		}

177
		json::Object response = m_requestSocket->requestJSON(createStartRequest(name, args, application::This::getName(), application::This::getId(), application::This::getEndpoint().toString()));
legoc's avatar
legoc committed
178

179
180
181
		int value = response[message::RequestResponse::VALUE].GetInt();
		if (value == -1) {
			instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
182
		}
183
184
		else {
			instance->setId(value);
185
186
187
188

			if (outputStream) {
				instance->setOutputStreamSocket(streamSocket);
			}
189
190
191
		}
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
192
193
194
195
196
197
198
199
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

200
	std::string request;
legoc's avatar
legoc committed
201
202

	if (immediately) {
203
		request = createKillRequest(id);
204
205
	}
	else {
206
		request = createStopRequest(id);
legoc's avatar
legoc committed
207
208
	}

209
	json::Object response = m_requestSocket->requestJSON(request);
legoc's avatar
legoc committed
210

211
	int value = response[message::RequestResponse::VALUE].GetInt();
212
	std::string message = response[message::RequestResponse::MESSAGE].GetString();
213
214

	return Response(value, message);
legoc's avatar
legoc committed
215
216
}

217
application::InstanceArray Server::connectAll(const std::string& name, int options) {
legoc's avatar
legoc committed
218
219

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
220

221
	json::Object response = m_requestSocket->requestJSON(createConnectRequest(name));
legoc's avatar
legoc committed
222

223
224
	application::InstanceArray instances;

225
226
227
	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
228

229
	// Allocate the array.
230
	instances.reserve(size);
legoc's avatar
legoc committed
231
232
233

	int aliveInstancesCount = 0;

234
235
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
236

237
		std::unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
238

239
		// Set the name and register the instance as event listener.
240
		std::string name = info[message::ApplicationInfo::NAME].GetString();
241
		instance->setName(name);
242
243
		registerEventListener(instance.get());

244
		int applicationId = info[message::ApplicationInfo::ID].GetInt();
legoc's avatar
legoc committed
245
246
247
248
249
250

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

			instance->setId(applicationId);
251
252
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
legoc's avatar
legoc committed
253

254
			if (outputStream) {
255
				std::unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
256
257
258
				instance->setOutputStreamSocket(streamSocket);
			}

259
			instances.push_back(std::move(instance));
legoc's avatar
legoc committed
260
261
262
		}
	}

263
	// Copy the alive instances.
legoc's avatar
legoc committed
264
	application::InstanceArray aliveInstances;
265
	aliveInstances.reserve(aliveInstancesCount);
legoc's avatar
legoc committed
266
267

	int j = 0;
268
	for (int i = 0; i < size; ++i) {
legoc's avatar
legoc committed
269

270
271
		if (instances[i].get() != nullptr) {
			aliveInstances.push_back(std::move(instances[i]));
legoc's avatar
legoc committed
272
273
274
275
276
277
278
			j++;
		}
	}

	return aliveInstances;
}

279
std::unique_ptr<application::Instance> Server::connect(const std::string& name, int options) {
legoc's avatar
legoc committed
280

legoc's avatar
legoc committed
281
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
282
283

	if (instances.size() == 0) {
284
		std::unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
285
286
287
		return instance;
	}

legoc's avatar
legoc committed
288
	return std::move(instances[0]);
legoc's avatar
legoc committed
289
290
}

291
std::unique_ptr<application::Instance> Server::connect(int id, int options) {
292
293
294

	bool outputStream = ((options & OUTPUTSTREAM) != 0);

295
	json::Object response = m_requestSocket->requestJSON(createConnectWithIdRequest(id));
296
297
298
299
300
301
302
303

	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();

	if (size > 0) {
		json::Value::Object info = array[0].GetObject();

304
		std::unique_ptr<application::Instance> instance = makeInstance();
305
306

		// Set the name and register the instance as event listener.
307
		std::string name = info[message::ApplicationInfo::NAME].GetString();
308
309
310
311
312
313
314
315
316
317
318
319
320
		instance->setName(name);
		registerEventListener(instance.get());

		int applicationId = info[message::ApplicationInfo::ID].GetInt();

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {

			instance->setId(applicationId);
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());

			if (outputStream) {
321
				std::unique_ptr<OutputStreamSocket> streamSocket = createOutputStreamSocket(name);
322
323
324
325
326
327
328
329
330
331
				instance->setOutputStreamSocket(streamSocket);
			}

			return instance;
		}
	}

	return makeInstance();
}

legoc's avatar
legoc committed
332
333
334
335
336
337
338
339
340
341
342
343
void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

344
	json::Object response = m_requestSocket->requestJSON(createIsAliveRequest(id));
legoc's avatar
legoc committed
345

346
	return response[message::IsAliveResponse::IS_ALIVE].GetBool();
legoc's avatar
legoc committed
347
348
349
350
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

351
	std::vector<application::Configuration> configs;
352

353
	json::Object response = m_requestSocket->requestJSON(createListRequest());
legoc's avatar
legoc committed
354

355
	json::Value& applicationConfigs = response[message::ApplicationConfigListResponse::APPLICATION_CONFIG];
356
357
	json::Value::Array array = applicationConfigs.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
358

359
360
	for (int i = 0; i < size; ++i) {
		json::Value::Object config = array[i].GetObject();
legoc's avatar
legoc committed
361

362
363
		std::string name = config[message::ApplicationConfig::NAME].GetString();
		std::string description = config[message::ApplicationConfig::DESCRIPTION].GetString();
364
365
366
367
		bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool();
		bool restart = config[message::ApplicationConfig::RESTART].GetBool();
		int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt();
		int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt();
legoc's avatar
legoc committed
368

369
370
371
372
373
374
375
376
		application::Configuration applicationConfig(name,
				description,
				runsSingle,
				restart,
				startingTime,
				stoppingTime);

		configs.push_back(applicationConfig);
legoc's avatar
legoc committed
377
378
	}

379
	return configs;
legoc's avatar
legoc committed
380
381
382
383
}

std::vector<application::Info> Server::getApplicationInfos() const {

384
	std::vector<application::Info> infos;
legoc's avatar
legoc committed
385

386
	json::Object response = m_requestSocket->requestJSON(createAppsRequest());
387

388
389
	json::Value& applicationInfos = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfos.GetArray();
390
	size_t size = array.Size();
legoc's avatar
legoc committed
391

392
393
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
394

395
		std::string name = info[message::ApplicationInfo::NAME].GetString();
396
397
398
399
		int id = info[message::ApplicationInfo::ID].GetInt();
		int pid = info[message::ApplicationInfo::PID].GetInt();
		application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt();
		application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt();
400
		std::string args = info[message::ApplicationInfo::ARGS].GetString();
legoc's avatar
legoc committed
401

402
403
404
405
406
407
		application::Info applicationInfo(name,
						id,
						pid,
						state,
						pastStates,
						args);
legoc's avatar
legoc committed
408

legoc's avatar
legoc committed
409
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
410
411
	}

legoc's avatar
legoc committed
412
	return infos;
legoc's avatar
legoc committed
413
414
415
416
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

417
418
	std::vector<application::Info> allInfos = getApplicationInfos();
	std::vector<application::Info> infos;
legoc's avatar
legoc committed
419

420
	for (std::vector<application::Info>::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) {
legoc's avatar
legoc committed
421
422
		application::Info const & info = *i;
		if (info.getName() == name) {
423
			infos.push_back(info);
legoc's avatar
legoc committed
424
425
426
		}
	}

427
	return infos;
legoc's avatar
legoc committed
428
429
}

legoc's avatar
legoc committed
430
std::vector<application::Port> Server::getPorts() const {
431

432
	std::vector<application::Port> ports;
433

434
	json::Object response = m_requestSocket->requestJSON(createPortsRequest());
435
436
437
438
439
440
441
442
443

	json::Value& portInfos = response[message::PortInfoListResponse::PORT_INFO];
	json::Value::Array array = portInfos.GetArray();
	size_t size = array.Size();

	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();

		int port = info[message::PortInfo::PORT].GetInt();
444
445
		std::string status = info[message::PortInfo::STATUS].GetString();
		std::string owner = info[message::PortInfo::OWNER].GetString();
446

legoc's avatar
legoc committed
447
		application::Port portInfo(port, status, owner);
448
449
450
451
452
453
454

		ports.push_back(portInfo);
	}

	return ports;
}

455
456
application::State Server::getActualState(int id) const {

457
	json::Object response = m_requestSocket->requestJSON(createGetStatusRequest(id));
458
459
460
461
462
463

	return response[message::StatusEvent::APPLICATION_STATE].GetInt();
}

std::set<application::State> Server::getPastStates(int id) const {

464
	json::Object response = m_requestSocket->requestJSON(createGetStatusRequest(id));
465
466
467

	application::State applicationStates = response[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();

468
	std::set<application::State> result;
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508

	if ((applicationStates & application::STARTING) != 0) {
		result.insert(application::STARTING);
	}

	if ((applicationStates & application::RUNNING) != 0) {
		result.insert(application::RUNNING);
	}

	if ((applicationStates & application::STOPPING) != 0) {
		result.insert(application::STOPPING);
	}

	if ((applicationStates & application::KILLING) != 0) {
		result.insert(application::KILLING);
	}

	if ((applicationStates & application::PROCESSING_ERROR) != 0) {
		result.insert(application::PROCESSING_ERROR);
	}

	if ((applicationStates & application::FAILURE) != 0) {
		result.insert(application::FAILURE);
	}

	if ((applicationStates & application::SUCCESS) != 0) {
		result.insert(application::SUCCESS);
	}

	if ((applicationStates & application::STOPPED) != 0) {
		result.insert(application::STOPPED);
	}

	if ((applicationStates & application::KILLED) != 0) {
		result.insert(application::KILLED);
	}

	return result;
}

legoc's avatar
legoc committed
509
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
510
511
512
513
514
515
516

	// Init the status port if necessary.
	if (m_statusPort == 0) {
		initStatus();
	}

	// Create the event stream socket.
legoc's avatar
legoc committed
517
	return std::unique_ptr<EventStreamSocket>(new EventStreamSocket(this));
legoc's avatar
legoc committed
518
519
}

legoc's avatar
legoc committed
520
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
521

522
	std::unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
523
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
524

525
	return connectionChecker;
legoc's avatar
legoc committed
526
527
}

528
529
void Server::storeKeyValue(int id, const std::string& key, const std::string& value) {

530
	json::Object response = m_requestSocket->requestJSON(createStoreKeyValueRequest(id, key, value));
531
532
533
534
}

std::string Server::getKeyValue(int id, const std::string& key) {

535
	json::Object response = m_requestSocket->requestJSON(createGetKeyValueRequest(id, key));
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == 0) {
		return response[message::RequestResponse::MESSAGE].GetString();
	}
	else if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return "";
}

void Server::removeKey(int id, const std::string& key) {

553
	json::Object response = m_requestSocket->requestJSON(createRemoveKeyRequest(id, key));
554
555
556
557
558
559
560
561
562
563

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
	else if (value == -2) {
		throw UndefinedKeyException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

564
565
int Server::requestPort(int id) {

566
	json::Object response = m_requestSocket->requestJSON(createRequestPortRequest(id));
567
568
569
570
571
572
573
574
575
576
577

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}

	return value;
}

void Server::setPortUnavailable(int id, int port) {

578
	json::Object response = m_requestSocket->requestJSON(createPortUnavailableRequest(id, port));
579
580
581
582
583
584
585
586
587

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

void Server::releasePort(int id, int port) {

588
	json::Object response = m_requestSocket->requestJSON(createReleasePortRequest(id, port));
589
590
591
592
593
594
595

	int value = response[message::RequestResponse::VALUE].GetInt();
	if (value == -1) {
		throw UndefinedApplicationException(response[message::RequestResponse::MESSAGE].GetString());
	}
}

596
json::Object Server::requestJSON(const std::string& request, int overrideTimeout) {
597
	return m_requestSocket->requestJSON(request, overrideTimeout);
598
599
}

600
json::Object Server::requestJSON(const std::string& requestPart1, const std::string& requestPart2, int overrideTimeout) {
601
	return m_requestSocket->requestJSON(requestPart1, requestPart2, overrideTimeout);
legoc's avatar
legoc committed
602
603
}

604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

626
627
void Server::initContext() {
	// Set the impl.
legoc's avatar
legoc committed
628
	m_contextImpl.reset(new ContextZmq());
629
630
631
632
633
634
635
}

void Server::initRequestSocket() {
	// Create the request socket. The server endpoint must have been initialized.
	m_requestSocket = std::move(createRequestSocket(m_serverEndpoint.toString(), m_contextImpl->getTimeout()));
}

legoc's avatar
legoc committed
636
637
638
639
Context * Server::getContext() {
	return m_contextImpl.get();
}

640
641
642
643
644
645
Endpoint Server::getStatusEndpoint() const {
	return m_serverEndpoint.withPort(m_statusPort);
}

void Server::retrieveServerVersion() {

646
	json::Object response = m_requestSocket->requestJSON(createVersionRequest());
647
648
649
650
651
652
653
654
655

	m_serverVersion[0] = response[message::VersionResponse::MAJOR].GetInt();
	m_serverVersion[1] = response[message::VersionResponse::MINOR].GetInt();
	m_serverVersion[2] = response[message::VersionResponse::REVISION].GetInt();
}

void Server::initStatus() {

	// Get the status port.
656
	json::Object response = m_requestSocket->requestJSON(createStreamStatusRequest());
657
658
659
660
661
662
663
664
665
666
667
668
669
670

	int value = response[message::RequestResponse::VALUE].GetInt();

	// Check response.
	if (value == -1) {
		return;
	}

	// Get the status port.
	m_statusPort = value;
}

int Server::getStreamPort(const std::string& name) {

671
	json::Object response = m_requestSocket->requestJSON(createOutputPortRequest(name));
672
673
674
675
676

	return response[message::RequestResponse::VALUE].GetInt();
}

std::unique_ptr<OutputStreamSocket> Server::createOutputStreamSocket(const std::string& name) {
legoc's avatar
legoc committed
677
678
	// Create the event stream socket.
	return std::unique_ptr<OutputStreamSocket>(new OutputStreamSocket(this, name));
679
680
}

681
682
std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint) {
	return std::unique_ptr<RequestSocket>(new RequestSocket(m_contextImpl.get(), endpoint, m_contextImpl->getTimeout()));
683
684
}

685
686
std::unique_ptr<RequestSocket> Server::createRequestSocket(const std::string& endpoint, int timeout) {
	return std::unique_ptr<RequestSocket>(new RequestSocket(m_contextImpl.get(), endpoint, timeout));
687
688
}

legoc's avatar
legoc committed
689
690
691
692
693
694
void Server::sendSync() {

	try {
		m_requestSocket->requestJSON(createSyncRequest());
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
695
696
697
698
699
700
701
702
703
704
705
		// The server is not accessible.
	}
}

void Server::sendSyncStream(const std::string& name) {

	try {
		m_requestSocket->requestJSON(createSyncStreamRequest(name));
	}
	catch (const ConnectionTimeout&) {
		// The server is not accessible.
legoc's avatar
legoc committed
706
707
708
	}
}

legoc's avatar
legoc committed
709
710
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

711
	os << "server@" << server.m_serverEndpoint.toString();
legoc's avatar
legoc committed
712
713
714
715
716

	return os;
}

}