Server.cpp 11.6 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "Server.h"
legoc's avatar
legoc committed
18
19
20

#include <iostream>
#include <sstream>
21
#include "Application.h"
22
#include "ConnectionChecker.h"
23
24
#include "impl/ServicesImpl.h"
#include "ProtoType.h"
25
#include "EventThread.h"
26
#include "impl/StreamSocketImpl.h"
legoc's avatar
legoc committed
27
#include "impl/RequestSocketImpl.h"
legoc's avatar
legoc committed
28
29
30
31
32
33

using namespace std;

namespace cameo {

Server::Server(const std::string& endpoint) :
34
	Services() {
legoc's avatar
legoc committed
35

36
	Services::init();
legoc's avatar
legoc committed
37
38
39
40

	vector<string> tokens = split(endpoint);

	if (tokens.size() < 3) {
41
		throw InvalidArgumentException(endpoint + " is not a valid endpoint");
legoc's avatar
legoc committed
42
43
44
45
46
47
48
	}

	m_url = tokens[0] + ":" + tokens[1];
	string port = tokens[2];
	istringstream is(port);
	is >> m_port;
	m_serverEndpoint = m_url + ":" + port;
49

50
51
52
	// Create the request socket. The server endpoint has been defined.
	Services::initRequestSocket();

53
54
55
56
	// Start the event thread.
	unique_ptr<EventStreamSocket> socket = openEventStream();
	m_eventThread.reset(new EventThread(this, socket));
	m_eventThread->start();
legoc's avatar
legoc committed
57
58
59
}

Server::~Server() {
60
61
62
63
	// Stop the event thread.
	if (m_eventThread.get() != nullptr) {
		m_eventThread->cancel();
	}
legoc's avatar
legoc committed
64
65
}

legoc's avatar
legoc committed
66
void Server::setTimeout(int timeoutMs) {
67
	Services::setTimeout(timeoutMs);
legoc's avatar
legoc committed
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
}

int Server::getTimeout() const {
	return Services::getTimeout();
}

const std::string& Server::getEndpoint() const {
	return Services::getEndpoint();
}

const std::string& Server::getUrl() const {
	return Services::getUrl();
}

int Server::getPort() const {
	return Services::getPort();
}

bool Server::isAvailable(int timeout) const {
	return Services::isAvailable(timeout);
}

legoc's avatar
legoc committed
90
bool Server::isAvailable() const {
91
92
	return isAvailable(getAvailableTimeout());
}
legoc's avatar
legoc committed
93

94
int Server::getAvailableTimeout() const {
legoc's avatar
legoc committed
95
96
	int timeout = getTimeout();
	if (timeout > 0) {
97
98
99
100
		return timeout;
	}
	else {
		return 10000;
legoc's avatar
legoc committed
101
102
103
	}
}

legoc's avatar
legoc committed
104
std::unique_ptr<application::Instance> Server::makeInstance() {
105
	return unique_ptr<application::Instance>(new application::Instance(this));
legoc's avatar
legoc committed
106
107
}

legoc's avatar
legoc committed
108
std::unique_ptr<application::Instance> Server::start(const std::string& name, Option options) {
legoc's avatar
legoc committed
109
110
111
	return start(name, vector<string>(), options);
}

legoc's avatar
legoc committed
112
113
int Server::getStreamPort(const std::string& name) {

legoc's avatar
legoc committed
114
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_OUTPUT), m_impl->createOutputRequest(name));
legoc's avatar
legoc committed
115
116
117
118
119
120
121

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	return requestResponse.value();
}

legoc's avatar
legoc committed
122
std::unique_ptr<application::Instance> Server::start(const std::string& name, const std::vector<std::string> & args, Option options) {
legoc's avatar
legoc committed
123

legoc's avatar
legoc committed
124
125
	bool outputStream = ((options & OUTPUTSTREAM) != 0);

legoc's avatar
legoc committed
126
	unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
127

128
	// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
129
	instance->setName(name);
130
	registerEventListener(instance.get());
legoc's avatar
legoc committed
131
132

	try {
legoc's avatar
legoc committed
133
		if (outputStream) {
legoc's avatar
legoc committed
134
135
			// We connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly.
legoc's avatar
legoc committed
136
137
138
139
			unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(name));
			instance->setOutputStreamSocket(socket);
		}

legoc's avatar
legoc committed
140
		unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_START), m_impl->createStartRequest(name, args, application::This::getReference()));
legoc's avatar
legoc committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159

		proto::RequestResponse requestResponse;
		requestResponse.ParseFromArray((*reply).data(), (*reply).size());

		if (requestResponse.value() == -1) {
			instance->setErrorMessage(requestResponse.message());
		} else {
			instance->setId(requestResponse.value());
		}

	} catch (const ConnectionTimeout& e) {
		instance->setErrorMessage(e.what());
	}

	return instance;
}

Response Server::stopApplicationAsynchronously(int id, bool immediately) const {

legoc's avatar
legoc committed
160
161
	string requestTypePart;
	string requestDataPart;
legoc's avatar
legoc committed
162
163

	if (immediately) {
legoc's avatar
legoc committed
164
165
		requestTypePart = m_impl->createRequestType(PROTO_KILL);
		requestDataPart = m_impl->createKillRequest(id);
legoc's avatar
legoc committed
166
	} else {
legoc's avatar
legoc committed
167
168
		requestTypePart = m_impl->createRequestType(PROTO_STOP);
		requestDataPart = m_impl->createStopRequest(id);
legoc's avatar
legoc committed
169
170
	}

legoc's avatar
legoc committed
171
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(requestTypePart, requestDataPart);
legoc's avatar
legoc committed
172
173
174
175
176
177
178

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	return Response(requestResponse.value(), requestResponse.message());
}

legoc's avatar
legoc committed
179
180
181
application::InstanceArray Server::connectAll(const std::string& name, Option options) {

	bool outputStream = ((options & OUTPUTSTREAM) != 0);
legoc's avatar
legoc committed
182
183
184

	application::InstanceArray instances;

legoc's avatar
legoc committed
185
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECT), m_impl->createConnectRequest(name));
legoc's avatar
legoc committed
186
187
188
189
190
191
192
193
194
195
196
197

	proto::ApplicationInfoListResponse response;
	response.ParseFromArray((*reply).data(), (*reply).size());

	// allocate the array
	instances.allocate(response.applicationinfo_size());

	int aliveInstancesCount = 0;

	for (int i = 0; i < response.applicationinfo_size(); ++i) {
		proto::ApplicationInfo info = response.applicationinfo(i);

legoc's avatar
legoc committed
198
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
199

200
		// Set the name and register the instance as event listener.
legoc's avatar
legoc committed
201
		instance->setName(info.name());
202
203
		registerEventListener(instance.get());

legoc's avatar
legoc committed
204
205
206
207
208
209
		int applicationId = info.id();

		// test if the application is still alive otherwise we could have missed a status message
		if (isAlive(applicationId)) {
			aliveInstancesCount++;

legoc's avatar
legoc committed
210
211
212
213
214
215
216
			// we connect to the stream port before starting the application
			// so that we are sure that the ENDSTREAM message will be received even if the application terminates rapidly
			if (outputStream) {
				unique_ptr<OutputStreamSocket> socket = createOutputStreamSocket(getStreamPort(info.name()));
				instance->setOutputStreamSocket(socket);
			}

legoc's avatar
legoc committed
217
218
219
220
			instance->setId(applicationId);
			instance->setInitialState(info.applicationstate());
			instance->setPastStates(info.pastapplicationstates());

legoc's avatar
legoc committed
221
			instances.m_array[i] = std::move(instance);
legoc's avatar
legoc committed
222
223
224
225
226
227
228
229
230
231
232
		}
	}

	// Copy the instances alive
	application::InstanceArray aliveInstances;
	aliveInstances.allocate(aliveInstancesCount);

	int j = 0;
	for (int i = 0; i < response.applicationinfo_size(); ++i) {

		if (instances.m_array[i].get() != 0) {
legoc's avatar
legoc committed
233
			aliveInstances[j] = std::move(instances.m_array[i]);
legoc's avatar
legoc committed
234
235
236
237
238
239
240
			j++;
		}
	}

	return aliveInstances;
}

legoc's avatar
legoc committed
241
std::unique_ptr<application::Instance> Server::connect(const std::string& name, Option options) {
legoc's avatar
legoc committed
242

legoc's avatar
legoc committed
243
	application::InstanceArray instances = connectAll(name, options);
legoc's avatar
legoc committed
244
245

	if (instances.size() == 0) {
legoc's avatar
legoc committed
246
		unique_ptr<application::Instance> instance = makeInstance();
legoc's avatar
legoc committed
247
248
249
		return instance;
	}

legoc's avatar
legoc committed
250
	return std::move(instances[0]);
legoc's avatar
legoc committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
}

void Server::killAllAndWaitFor(const std::string& name) {

	application::InstanceArray instances = connectAll(name);

	for (int i = 0; i < instances.size(); ++i) {
		instances[i]->kill();
		instances[i]->waitFor();
	}
}

bool Server::isAlive(int id) const {

legoc's avatar
legoc committed
265
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ISALIVE), m_impl->createIsAliveRequest(id));
legoc's avatar
legoc committed
266
267
268
269
270
271
272
273
274
275
276

	proto::IsAliveResponse isAliveResponse;
	isAliveResponse.ParseFromArray((*reply).data(), (*reply).size());

	return isAliveResponse.isalive();
}

std::vector<application::Configuration> Server::getApplicationConfigurations() const {

	vector<application::Configuration> configVector;

legoc's avatar
legoc committed
277
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_ALLAVAILABLE), m_impl->createAllAvailableRequest());
legoc's avatar
legoc committed
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300

	proto::AllAvailableResponse allAvailableResponse;
	allAvailableResponse.ParseFromArray((*reply).data(), (*reply).size());

	for (int i = 0; i < allAvailableResponse.applicationconfig_size(); ++i) {
		proto::ApplicationConfig config = allAvailableResponse.applicationconfig(i);

		application::Configuration applicationConfig(config.name(),
				config.description(),
				config.runssingle(),
				config.restart(),
				config.startingtime(),
				config.retries(),
				config.stoppingtime());

		configVector.push_back(applicationConfig);
	}

	return configVector;
}

std::vector<application::Info> Server::getApplicationInfos() const {

legoc's avatar
legoc committed
301
	vector<application::Info> infos;
legoc's avatar
legoc committed
302

legoc's avatar
legoc committed
303
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_SHOWALL), m_impl->createShowAllRequest());
legoc's avatar
legoc committed
304
305
306
307
308
309
310
311
312

	proto::ApplicationInfoListResponse response;
	response.ParseFromArray((*reply).data(), (*reply).size());

	for (int i = 0; i < response.applicationinfo_size(); ++i) {
		proto::ApplicationInfo info = response.applicationinfo(i);

		application::Info applicationInfo(info.name(),
						info.id(),
legoc's avatar
legoc committed
313
						info.pid(),
legoc's avatar
legoc committed
314
315
316
317
						info.applicationstate(),
						info.pastapplicationstates(),
						info.args());

legoc's avatar
legoc committed
318
		infos.push_back(applicationInfo);
legoc's avatar
legoc committed
319
320
	}

legoc's avatar
legoc committed
321
	return infos;
legoc's avatar
legoc committed
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
}

std::vector<application::Info> Server::getApplicationInfos(const std::string& name) const {

	vector<application::Info> allInfoVector = getApplicationInfos();
	vector<application::Info> infoVector;

	for (vector<application::Info>::const_iterator i = allInfoVector.begin(); i != allInfoVector.end(); ++i) {
		application::Info const & info = *i;
		if (info.getName() == name) {
			infoVector.push_back(info);
		}
	}

	return infoVector;
}

legoc's avatar
legoc committed
339
std::unique_ptr<EventStreamSocket> Server::openEventStream() {
legoc's avatar
legoc committed
340
341
342
	return Services::openEventStream();
}

343
std::unique_ptr<application::Subscriber> Server::createSubscriber(int id, const std::string& publisherName, const std::string& instanceName) {
legoc's avatar
legoc committed
344

legoc's avatar
legoc committed
345
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_CONNECTPUBLISHER), m_impl->createConnectPublisherRequest(id, publisherName));
legoc's avatar
legoc committed
346
347
348
349
350
351
352
353
354
355
356
357

	proto::PublisherResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

	int publisherPort = requestResponse.publisherport();
	if (publisherPort == -1) {
		throw SubscriberCreationException(requestResponse.message());
	}

	int synchronizerPort = requestResponse.synchronizerport();
	int numberOfSubscribers = requestResponse.numberofsubscribers();

legoc's avatar
legoc committed
358
	unique_ptr<application::Subscriber> subscriber(new application::Subscriber(this, getUrl(), publisherPort, synchronizerPort, publisherName, numberOfSubscribers, instanceName, id, m_serverEndpoint, m_serverStatusEndpoint));
legoc's avatar
legoc committed
359
360
361
362
363
	subscriber->init();

	return subscriber;
}

legoc's avatar
legoc committed
364
std::unique_ptr<ConnectionChecker> Server::createConnectionChecker(ConnectionCheckerType handler, int pollingTimeMs) {
365

legoc's avatar
legoc committed
366
	unique_ptr<ConnectionChecker> connectionChecker(new ConnectionChecker(this, handler));
367
	connectionChecker->startThread(getAvailableTimeout(), pollingTimeMs);
legoc's avatar
legoc committed
368

369
	return connectionChecker;
legoc's avatar
legoc committed
370
371
}

372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
std::vector<EventListener *> Server::getEventListeners() {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	return m_eventListeners;
}

void Server::registerEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);
	m_eventListeners.push_back(listener);
}

void Server::unregisterEventListener(EventListener * listener) {
	std::unique_lock<std::mutex> lock(m_eventListenersMutex);

	// Iterate to find the listener.
	for (auto it = m_eventListeners.begin(); it != m_eventListeners.end(); ++it) {
		if (*it == listener) {
			m_eventListeners.erase(it);
			break;
		}
	}
}

legoc's avatar
legoc committed
394
395
396
397
398
399
400
401
std::ostream& operator<<(std::ostream& os, const cameo::Server& server) {

	os << "server@" << server.m_serverEndpoint;

	return os;
}

}