Server.cpp 12.8 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "Server.h"
#include "Application.h"
19
#include "ConnectionChecker.h"
20
#include "impl/ServicesImpl.h"
21
#include "EventThread.h"
22
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
23
#include "impl/RequestSocketImpl.h"
24
25
26
27
#include "message/JSON.h"
#include "message/Message.h"
#include <iostream>
#include <sstream>
legoc's avatar
legoc committed
28
29
30
31
32

using namespace std;

namespace cameo {

legoc's avatar
legoc committed
33
Server::Server(const std::string& endpoint, int timeoutMs) :
34
	Services() {
legoc's avatar
legoc committed
35

36
	Services::init();
legoc's avatar
legoc committed
37
38
39
40

	vector<string> tokens = split(endpoint);

	if (tokens.size() < 3) {
41
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
legoc's avatar
legoc committed
42
43
44
45
46
47
48
	}

	m_url = tokens[0] + ":" + tokens[1];
	string port = tokens[2];
	istringstream is(port);
	is >> m_port;
	m_serverEndpoint = m_url + ":" + port;
49

legoc's avatar
legoc committed
50
51
52
	// Set the timeout.
	Services::setTimeout(timeoutMs);

53
54
55
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

56
57
58
	// Retrieve the server version.
	Services::retrieveServerVersion();

legoc's avatar
legoc committed
59
60
61
62
63
64
65
66
67
68
	// Manage the ConnectionTimeout exception that can occur.
	try {
		// Start the event thread.
		unique_ptr<EventStreamSocket> socket = openEventStream();
		m_eventThread.reset(new EventThread(this, socket));
		m_eventThread->start();
	}
	catch (...) {
		// ...
	}
legoc's avatar
legoc committed
69
70
71
}

Server::~Server() {
72
73
74
75
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
76
77
}

legoc's avatar
legoc committed
78
void Server::setTimeout(int timeoutMs) {
79
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
80
81
82
83
84
85
86
87
88
89
90
91
92
93
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

const std::string& Server::getEndpoint() const {
	return Services::getEndpoint();
}

const std::string& Server::getUrl() const {
	return Services::getUrl();
}

legoc's avatar
legoc committed
94
95
96
97
std::array<int, 3> Server::getVersion() const {
	return Services::getVersion();
}

legoc's avatar
legoc committed
98
99
100
101
102
103
104
105
int Server::getPort() const {
	return Services::getPort();
}

bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
106
bool Server::isAvailable() const {
107
108
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
109

110
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
111
112
	int timeout = getTimeout();
	if (timeout > 0) {
113
114
115
116
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
117
118
119
	}
}

legoc's avatar
legoc committed
120
std::unique_ptr<application::Instance> Server::makeInstance() {
121
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
122
123
}

legoc's avatar
legoc committed
124
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
legoc's avatar
legoc committed
125
126
127
	return start(name, vector<string>(), options);
}

legoc's avatar
legoc committed
128
129
int Server::getStreamPort(const std::string& name) {

130
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createOutputRequest(name));
legoc's avatar
legoc committed
131

132
133
134
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
135

136
	return response[message::RequestResponse::VALUE].GetInt();
legoc's avatar
legoc committed
137
138
}

legoc's avatar
legoc committed
139
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
legoc's avatar
legoc committed
140

legoc's avatar
legoc committed
141
142
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
143
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
144

145
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
146
	instance->setName(name);
147
	registerEventListener(instance.get());
legoc's avatar
legoc committed
148
149

	try {
legoc's avatar
legoc committed
150
		if (outputStream) {
legoc's avatar
legoc committed
151
152
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
153
154
155
156
			unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
			instance->setOutputStreamSocket(socket);
		}

157
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createStartRequest(name, args, application::This::getReference()));
legoc's avatar
legoc committed
158

159
160
161
		// Get the JSON response.
		json::Object response;
		json::parse(response, reply.get());
legoc's avatar
legoc committed
162

163
164
165
		int value = response[message::RequestResponse::VALUE].GetInt();
		if (value == -1) {
			instance->setErrorMessage(response[message::RequestResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
166
		}
167
168
169
170
171
		else {
			instance->setId(value);
		}
	}
	catch (const ConnectionTimeout& e) {
legoc's avatar
legoc committed
172
173
174
175
176
177
178
179
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

180
	string request;
legoc's avatar
legoc committed
181
182

	if (immediately) {
183
184
185
186
		request = m_impl->createKillRequest(id);
	}
	else {
		request = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
187
188
	}

189
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(request);
legoc's avatar
legoc committed
190

191
192
193
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
194

195
196
197
198
	int value = response[message::RequestResponse::VALUE].GetInt();
	string message = response[message::RequestResponse::MESSAGE].GetString();

	return Response(value, message);
legoc's avatar
legoc committed
199
200
}

legoc's avatar
legoc committed
201
202
203
application::InstanceArray Server::connectAll(const std::string& name, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
204
205
206

	application::InstanceArray instances;

207
208
209
210
211
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectRequest(name));

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
212

213
214
215
	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
216

217
218
	// Allocate the array.
	instances.allocate(size);
legoc's avatar
legoc committed
219
220
221

	int aliveInstancesCount = 0;

222
223
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
224

legoc's avatar
legoc committed
225
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
226

227
		// Set the name and register the instance as event listener.
228
229
		string name = info[message::ApplicationInfo::NAME].GetString();
		instance->setName(name);
230
231
		registerEventListener(instance.get());

232
		int applicationId = info[message::ApplicationInfo::ID].GetInt();
legoc's avatar
legoc committed
233
234
235
236
237

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

238
239
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
240
			if (outputStream) {
241
				unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
legoc's avatar
legoc committed
242
243
244
				instance->setOutputStreamSocket(socket);
			}

legoc's avatar
legoc committed
245
			instance->setId(applicationId);
246
247
			instance->setInitialState(info[message::ApplicationInfo::APPLICATION_STATE].GetInt());
			instance->setPastStates(info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt());
legoc's avatar
legoc committed
248

legoc's avatar
legoc committed
249
			instances.m_array[i] = std::move(instance);
legoc's avatar
legoc committed
250
251
252
		}
	}

253
	// Copy the alive instances.
legoc's avatar
legoc committed
254
255
256
257
	application::InstanceArray aliveInstances;
	aliveInstances.allocate(aliveInstancesCount);

	int j = 0;
258
	for (int i = 0; i < size; ++i) {
legoc's avatar
legoc committed
259
260

		if (instances.m_array[i].get() != 0) {
legoc's avatar
legoc committed
261
			aliveInstances[j] = std::move(instances.m_array[i]);
legoc's avatar
legoc committed
262
263
264
265
266
267
268
			j++;
		}
	}

	return aliveInstances;
}

legoc's avatar
legoc committed
269
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
legoc's avatar
legoc committed
270

legoc's avatar
legoc committed
271
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
272
273

	if (instances.size() == 0) {
legoc's avatar
legoc committed
274
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
275
276
277
		return instance;
	}

legoc's avatar
legoc committed
278
	return std::move(instances[0]);
legoc's avatar
legoc committed
279
280
281
282
283
284
285
286
287
288
289
290
291
292
}

void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

293
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
294

295
296
297
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
298

299
	return response[message::IsAliveResponse::IS_ALIVE].GetBool();
legoc's avatar
legoc committed
300
301
302
303
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

304
305
306
	vector<application::Configuration> configs;

	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createAllAvailableRequest());
legoc's avatar
legoc committed
307

308
309
310
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
311

312
313
314
	json::Value& applicationConfigs = response[message::AllAvailableResponse::APPLICATION_CONFIG];
	json::Value::Array array = applicationConfigs.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
315

316
317
	for (int i = 0; i < size; ++i) {
		json::Value::Object config = array[i].GetObject();
legoc's avatar
legoc committed
318

319
320
321
322
323
324
		string name = config[message::ApplicationConfig::NAME].GetString();
		string description = config[message::ApplicationConfig::DESCRIPTION].GetString();
		bool runsSingle = config[message::ApplicationConfig::RUNS_SINGLE].GetBool();
		bool restart = config[message::ApplicationConfig::RESTART].GetBool();
		int startingTime = config[message::ApplicationConfig::STARTING_TIME].GetInt();
		int stoppingTime = config[message::ApplicationConfig::STOPPING_TIME].GetInt();
legoc's avatar
legoc committed
325

326
327
328
329
330
331
332
333
		application::Configuration applicationConfig(name,
				description,
				runsSingle,
				restart,
				startingTime,
				stoppingTime);

		configs.push_back(applicationConfig);
legoc's avatar
legoc committed
334
335
	}

336
	return configs;
legoc's avatar
legoc committed
337
338
339
340
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
341
	vector<application::Info> infos;
legoc's avatar
legoc committed
342

343
344
345
346
347
348
349
350
351
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createShowAllRequest());

	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());

	json::Value& applicationInfo = response[message::ApplicationInfoListResponse::APPLICATION_INFO];
	json::Value::Array array = applicationInfo.GetArray();
	size_t size = array.Size();
legoc's avatar
legoc committed
352

353
354
	for (int i = 0; i < size; ++i) {
		json::Value::Object info = array[i].GetObject();
legoc's avatar
legoc committed
355

356
357
358
359
360
361
		string name = info[message::ApplicationInfo::NAME].GetString();
		int id = info[message::ApplicationInfo::ID].GetInt();
		int pid = info[message::ApplicationInfo::PID].GetInt();
		application::State state = info[message::ApplicationInfo::APPLICATION_STATE].GetInt();
		application::State pastStates = info[message::ApplicationInfo::PAST_APPLICATION_STATES].GetInt();
		string args = info[message::ApplicationInfo::ARGS].GetString();
legoc's avatar
legoc committed
362

363
364
365
366
367
368
		application::Info applicationInfo(name,
						id,
						pid,
						state,
						pastStates,
						args);
legoc's avatar
legoc committed
369

legoc's avatar
legoc committed
370
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
371
372
	}

legoc's avatar
legoc committed
373
	return infos;
legoc's avatar
legoc committed
374
375
376
377
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

378
379
	vector<application::Info> allInfos = getApplicationInfos();
	vector<application::Info> infos;
legoc's avatar
legoc committed
380

381
	for (vector<application::Info>::const_iterator i = allInfos.begin(); i != allInfos.end(); ++i) {
legoc's avatar
legoc committed
382
383
		application::Info const & info = *i;
		if (info.getName() == name) {
384
			infos.push_back(info);
legoc's avatar
legoc committed
385
386
387
		}
	}

388
	return infos;
legoc's avatar
legoc committed
389
390
}

legoc's avatar
legoc committed
391
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
392
393
394
	return Services::openEventStream();
}

395
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
396

397
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
398

399
400
401
	// Get the JSON response.
	json::Object response;
	json::parse(response, reply.get());
legoc's avatar
legoc committed
402

403
	int publisherPort = response[message::PublisherResponse::PUBLISHER_PORT].GetInt();
legoc's avatar
legoc committed
404
	if (publisherPort == -1) {
405
		throw SubscriberCreationException(response[message::PublisherResponse::MESSAGE].GetString());
legoc's avatar
legoc committed
406
407
	}

408
409
	int synchronizerPort = response[message::PublisherResponse::SYNCHRONIZER_PORT].GetInt();
	int numberOfSubscribers = response[message::PublisherResponse::NUMBER_OF_SUBSCRIBERS].GetInt();
legoc's avatar
legoc committed
410

legoc's avatar
legoc committed
411
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint, m_serverStatusEndpoint));
legoc's avatar
legoc committed
412
413
414
415
416
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
417
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
418

legoc's avatar
legoc committed
419
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
420
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
421

422
	return connectionChecker;
legoc's avatar
legoc committed
423
424
}

425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
447
448
449
450
451
452
453
454
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

	os << "server@" << server.m_serverEndpoint;

	return os;
}

}