Application.h 13.9 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#ifndef CAMEO_APPLICATION_H_
#define CAMEO_APPLICATION_H_

legoc's avatar
legoc committed
20
#include <functional>
legoc's avatar
legoc committed
21
22
23
#include <vector>
#include <set>
#include <memory>
24
#include "InvalidArgumentException.h"
25
#include "UnmanagedApplicationException.h"
26
#include "SocketException.h"
27
28
29
30
#include "ConnectionTimeout.h"
#include "PublisherCreationException.h"
#include "RequesterCreationException.h"
#include "ResponderCreationException.h"
31
32
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
33
#include "Response.h"
34
#include "Serializer.h"
35
#include "Services.h"
legoc's avatar
legoc committed
36
#include "TimeCondition.h"
37
#include "EventListener.h"
legoc's avatar
legoc committed
38
39
40
41

namespace cameo {

enum Option {
legoc's avatar
legoc committed
42
43
	NONE = 0,
	OUTPUTSTREAM = 1
legoc's avatar
legoc committed
44
45
46
47
};

class Server;
class EventStreamSocket;
legoc's avatar
legoc committed
48
class OutputStreamSocket;
legoc's avatar
legoc committed
49
50
51
52
53
54
55
56
57
class PublisherImpl;
class SubscriberImpl;
class RequestImpl;
class ResponderImpl;
class RequesterImpl;
class WaitingImpl;
class SocketWaitingImpl;
class GenericWaitingImpl;
class WaitingImplSet;
58
class HandlerImpl;
legoc's avatar
legoc committed
59
60
61
62
63
64
65
66
67
68
69
70
71

namespace application {

// forward declarations
class Publisher;
class Subscriber;
class Responder;
class Requester;
class Instance;
class Waiting;

typedef int32_t State;

legoc's avatar
legoc committed
72
73
#undef ERROR

legoc's avatar
legoc committed
74
75
76
77
78
79
const State UNKNOWN = 0;
const State STARTING = 1;
const State RUNNING = 2;
const State STOPPING = 4;
const State KILLING = 8;
const State PROCESSING_ERROR = 16;
legoc's avatar
legoc committed
80
const State FAILURE = 32;
legoc's avatar
legoc committed
81
82
83
84
85
const State SUCCESS = 64;
const State STOPPED = 128;
const State KILLED = 256;


86
class This : private Services, private EventListener {
legoc's avatar
legoc committed
87
88
89
90
91
92
93
94
95
96
97
98
99

	friend class cameo::application::Publisher;
	friend class cameo::application::Responder;
	friend class cameo::application::Requester;
	friend class cameo::PublisherImpl;
	friend class cameo::RequestImpl;
	friend class cameo::ResponderImpl;
	friend class cameo::RequesterImpl;
	friend class cameo::SocketWaitingImpl;
	friend class cameo::GenericWaitingImpl;
	friend class cameo::Server;
	friend std::ostream& operator<<(std::ostream&, const cameo::application::This&);

legoc's avatar
legoc committed
100
	typedef std::function<void ()> StopFunctionType;
legoc's avatar
legoc committed
101
102

public:
103
104
105
	This();
	~This();

legoc's avatar
legoc committed
106
	static void init(int argc, char *argv[]);
107
108
109
110

	/**
	 * The terminate call is not necessary unless the static instance of This is not destroyed automatically.
	 */
legoc's avatar
legoc committed
111
112
113
114
115
116
	static void terminate();

	static const std::string& getName();
	static int getId();
	static void setTimeout(int timeout);
	static int getTimeout();
117
	static const std::string& getEndpoint();
legoc's avatar
legoc committed
118
119
120
121
122
123
124
125
126
127
128
129
	static Server& getServer();

	/**
	 * throws StarterServerException.
	 */
	static Server& getStarterServer();
	static const std::string& getUrl();
	static bool isAvailable(int timeout = 10000);
	static bool isStopping();

	template<typename Type>
	static void handleStop(Type function) {
130
		m_instance.handleStopImpl(function);
legoc's avatar
legoc committed
131
132
133
134
	}

	static void cancelWaitings();

135
	static bool setRunning();
legoc's avatar
legoc committed
136
137
138
139
140
141
142
143
144
145

	/**
	 * Sets the result.
	 */
	static void setBinaryResult(const std::string& data);
	static void setResult(const std::string& data);

	/**
	 * Connects to the starter application, i.e. the application which started this application.
	 */
legoc's avatar
legoc committed
146
	static std::unique_ptr<Instance> connectToStarter();
legoc's avatar
legoc committed
147

148
149
150
151
	static void storeKeyValue(const std::string& key, const std::string& value);
	static std::string getKeyValue(const std::string& key);
	static void removeKey(const std::string& key);

legoc's avatar
legoc committed
152
private:
153
	void initApplication(int argc, char *argv[]);
legoc's avatar
legoc committed
154

legoc's avatar
legoc committed
155
156
157
	static std::string getReference();
	static State parseState(const std::string& value);
	State getState(int id) const;
legoc's avatar
legoc committed
158
159
160
161

	int initUnmanagedApplication();
	void terminateUnmanagedApplication();

legoc's avatar
legoc committed
162
163
164
	bool destroyPublisher(const std::string& name) const;
	bool removePort(const std::string& name) const;
	State waitForStop();
165
166

	void stoppingFunction(StopFunctionType stop);
legoc's avatar
legoc committed
167
168
169
170
	void handleStopImpl(StopFunctionType function);

	std::string m_name;
	int m_id;
legoc's avatar
legoc committed
171
	bool m_managed;
legoc's avatar
legoc committed
172
173
174
175
176

	std::string m_starterEndpoint;
	std::string m_starterName;
	int m_starterId;

legoc's avatar
legoc committed
177
178
	std::unique_ptr<Server> m_server;
	std::unique_ptr<Server> m_starterServer;
legoc's avatar
legoc committed
179

legoc's avatar
legoc committed
180
	std::unique_ptr<WaitingImplSet> m_waitingSet;
181
	std::unique_ptr<HandlerImpl> m_stopHandler;
legoc's avatar
legoc committed
182

183
	static This m_instance;
legoc's avatar
legoc committed
184
185
186
187
	static const std::string RUNNING_STATE;
};


188
class Instance : private EventListener {
legoc's avatar
legoc committed
189
190
191
192
193
194

	friend class cameo::Server;
	friend class cameo::application::Subscriber;
	friend std::ostream& operator<<(std::ostream&, const Instance&);

public:
legoc's avatar
legoc committed
195
	typedef std::function<void (State)> StateHandlerType;
legoc's avatar
legoc committed
196

legoc's avatar
legoc committed
197
198
	~Instance();

legoc's avatar
legoc committed
199
	const std::string& getName() const;
legoc's avatar
legoc committed
200
201
202
203
204
205
206
207
208
	int getId() const;
	const std::string& getUrl() const;
	const std::string& getEndpoint() const;
	std::string getNameId() const;
	bool hasResult() const;
	bool exists() const;
	const std::string& getErrorMessage() const;
	bool stop();
	bool kill();
legoc's avatar
legoc committed
209

legoc's avatar
legoc committed
210
211
	State waitFor(StateHandlerType handler = nullptr);
	State waitFor(int states, StateHandlerType handler = nullptr);
212
	State waitFor(const std::string& eventName);
legoc's avatar
legoc committed
213

legoc's avatar
legoc committed
214
215
	void cancelWaitFor();

216
217
218
219
	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
220
221
	State now();

222
223
224
225
226
227
228
229
230
231
	/**
	 * Gets the last state.
	 */
	State getLastState();

	/**
	 * Returns the actual state and UNKNOWN if the instance does not exist anymore.
	 */
	State getActualState() const;

232
233
234
235
236
	/**
	 * Returns the past states.
	 */
	std::set<State> getPastStates() const;

237
238
239
240
241
	/**
	 * Returns the exit code.
	 */
	int getExitCode() const;

legoc's avatar
legoc committed
242
243
244
	bool getBinaryResult(std::string& result);
	bool getResult(std::string& result);

legoc's avatar
legoc committed
245
246
	std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();

247
248
	std::string getKeyValue(const std::string& key);

legoc's avatar
legoc committed
249
private:
250
	Instance(Server * server);
legoc's avatar
legoc committed
251
252
253

	void setId(int id);
	void setErrorMessage(const std::string& message);
legoc's avatar
legoc committed
254
	void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
legoc's avatar
legoc committed
255
256
	void setPastStates(State pastStates);
	void setInitialState(State state);
257
	State waitFor(int states, const std::string& eventName, StateHandlerType handler, bool blocking);
legoc's avatar
legoc committed
258

259
	Server * m_server;
legoc's avatar
legoc committed
260
	std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
legoc's avatar
legoc committed
261
262
263
264
265
266
267
	int m_id;
	std::string m_errorMessage;
	int m_pastStates;
	State m_initialState;
	State m_lastState;
	bool m_hasResult;
	std::string m_resultData;
268
	int m_exitCode;
269
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
};

///////////////////////////////////////////////////////////////////////////
// InstanceArray

class InstanceArray {

	friend class cameo::Server;

public:
	InstanceArray(const InstanceArray& array);
	~InstanceArray();

	std::size_t size() const;
legoc's avatar
legoc committed
284
	std::unique_ptr<Instance>& operator[](std::size_t index);
legoc's avatar
legoc committed
285
286
287
288
289
290

private:
	InstanceArray();
	void allocate(std::size_t size);

	std::size_t m_size;
legoc's avatar
legoc committed
291
	std::unique_ptr<Instance>* m_array;
legoc's avatar
legoc committed
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
};

///////////////////////////////////////////////////////////////////////////
// Publisher

class Publisher {

	friend class cameo::application::This;
	friend std::ostream& operator<<(std::ostream&, const Publisher&);

public:
	~Publisher();

	/**
	 * Returns the publisher with name.
	 * throws PublisherCreationException.
	 */
legoc's avatar
legoc committed
309
	static std::unique_ptr<Publisher> create(const std::string& name, int numberOfSubscribers = 0);
legoc's avatar
legoc committed
310
311
312
313
314
315
316
317
318
319
320
321
322
323

	const std::string& getName() const;
	const std::string& getApplicationName() const;
	int getApplicationId() const;
	const std::string& getApplicationEndpoint() const;

	/**
	 * Returns true if the wait succeeds or false if it was canceled.
	 */
	bool waitForSubscribers() const;
	void cancelWaitForSubscribers();

	void sendBinary(const std::string& data) const;
	void send(const std::string& data) const;
324
	void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
legoc's avatar
legoc committed
325
	void sendEnd() const;
326
327
328
329
330

	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
legoc's avatar
legoc committed
331
332
	bool hasEnded() const;

333
334
	bool isEnded() const;

legoc's avatar
legoc committed
335
private:
legoc's avatar
legoc committed
336
	Publisher(application::This * application, int publisherPort, int synchronizerPort, const std::string& name, int numberOfSubscribers);
legoc's avatar
legoc committed
337

legoc's avatar
legoc committed
338
339
	std::unique_ptr<PublisherImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
340
341
342
343
344
345
346
347
348
349
350
351
352
353
};

///////////////////////////////////////////////////////////////////////////
// Subscriber

class Subscriber {

	friend class cameo::Server;
	friend class cameo::application::Instance;
	friend std::ostream& operator<<(std::ostream&, const Subscriber&);

public:
	~Subscriber();

legoc's avatar
legoc committed
354
	static std::unique_ptr<Subscriber> create(Instance & instance, const std::string& publisherName);
legoc's avatar
legoc committed
355
356
357
358
359
360

	const std::string& getPublisherName() const;
	const std::string& getInstanceName() const;
	int getInstanceId() const;
	const std::string& getInstanceEndpoint() const;

361
362
363
364
	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
legoc's avatar
legoc committed
365
366
	bool hasEnded() const;

367
368
369
	bool isEnded() const;
	bool isCanceled() const;

legoc's avatar
legoc committed
370
371
372
373
374
	/**
	 * Returns false if the stream finishes.
	 */
	bool receiveBinary(std::string& data) const;
	bool receive(std::string& data) const;
375
	bool receiveTwoBinaryParts(std::string& data1, std::string& data2) const;
legoc's avatar
legoc committed
376
377
378
379

	void cancel();

private:
380
	Subscriber(Server * server, const std::string& url, int publisherPort, int synchronizerPort, const std::string& publisherName, int numberOfSubscribers, const std::string& instanceName, int instanceId, const std::string& instanceEndpoint, const std::string& statusEndpoint);
legoc's avatar
legoc committed
381
382
	void init();

legoc's avatar
legoc committed
383
384
	std::unique_ptr<SubscriberImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
385
386
387
388
389
390
391
392
393
394
395
396
397
};

///////////////////////////////////////////////////////////////////////////
// Request

class Request {

	friend class cameo::application::Responder;
	friend std::ostream& operator<<(std::ostream&, const Request&);

public:
	~Request();

legoc's avatar
legoc committed
398
	std::string getObjectId() const;
399
	std::string getRequesterEndpoint() const;
legoc's avatar
legoc committed
400

401
402
403
	const std::string& getBinary() const;
	std::string get() const;
	const std::string& getSecondBinaryPart() const;
legoc's avatar
legoc committed
404

legoc's avatar
legoc committed
405
	void setTimeout(int value);
406

legoc's avatar
legoc committed
407
408
	bool replyBinary(const std::string& response);
	bool reply(const std::string& response);
legoc's avatar
legoc committed
409

legoc's avatar
legoc committed
410
	std::unique_ptr<Instance> connectToRequester();
411

legoc's avatar
legoc committed
412
413
414
	/**
	 * Transfers the ownership of the requester server.
	 */
legoc's avatar
legoc committed
415
	std::unique_ptr<Server> getServer();
legoc's avatar
legoc committed
416

legoc's avatar
legoc committed
417
private:
legoc's avatar
legoc committed
418
	Request(std::unique_ptr<RequestImpl> & impl);
legoc's avatar
legoc committed
419

legoc's avatar
legoc committed
420
421
	std::unique_ptr<RequestImpl> m_impl;
	std::unique_ptr<Server> m_requesterServer;
legoc's avatar
legoc committed
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
};

///////////////////////////////////////////////////////////////////////////
// Responder

class Responder {

	friend std::ostream& operator<<(std::ostream&, const Responder&);

public:
	~Responder();

	/**
	 * Returns the responder with name.
	 * throws ResponderCreationException.
	 */
legoc's avatar
legoc committed
438
	static std::unique_ptr<Responder> create(const std::string& name);
legoc's avatar
legoc committed
439
440
441
442

	const std::string& getName() const;

	void cancel();
legoc's avatar
legoc committed
443
	std::unique_ptr<Request> receive();
444
445

	bool isCanceled() const;
legoc's avatar
legoc committed
446
447

private:
legoc's avatar
legoc committed
448
	Responder(application::This * application, int responderPort, const std::string& name);
legoc's avatar
legoc committed
449

legoc's avatar
legoc committed
450
451
	std::unique_ptr<ResponderImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
};

///////////////////////////////////////////////////////////////////////////
// Requester

class Requester {

friend std::ostream& operator<<(std::ostream&, const Requester&);

public:
	~Requester();

	/**
	 * Returns the responder with name.
	 * throws RequesterCreationException.
	 */
legoc's avatar
legoc committed
468
	static std::unique_ptr<Requester> create(Instance & instance, const std::string& name);
legoc's avatar
legoc committed
469
470
471
472
473

	const std::string& getName() const;

	void sendBinary(const std::string& request);
	void send(const std::string& request);
474
	void sendTwoBinaryParts(const std::string& request1, const std::string& request2);
legoc's avatar
legoc committed
475
476
477
478
479
480

	bool receiveBinary(std::string& response);
	bool receive(std::string& response);

	void cancel();

481
482
	bool isCanceled() const;

legoc's avatar
legoc committed
483
private:
legoc's avatar
legoc committed
484
	Requester(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId);
legoc's avatar
legoc committed
485

legoc's avatar
legoc committed
486
487
	std::unique_ptr<RequesterImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
488
489
490
491
492
493
494
495
496
497
};

///////////////////////////////////////////////////////////////////////////
// Configuration

class Configuration {

	friend std::ostream& operator<<(std::ostream&, const Configuration&);

public:
498
	Configuration(const std::string& name, const std::string& description, bool singleInfo, bool restart, int startingTime, int stoppingTime);
legoc's avatar
legoc committed
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523

	const std::string& getName() const;
	const std::string& getDescription() const;
	bool hasSingleInstance() const;
	bool canRestart() const;
	int getStartingTime() const;
	int getStoppingTime() const;

private:
	std::string m_name;
	std::string m_description;
	bool m_singleInstance;
	bool m_restart;
	int m_startingTime;
	int m_stoppingTime;
};

///////////////////////////////////////////////////////////////////////////
// Info

class Info {

	friend std::ostream& operator<<(std::ostream&, const Info&);

public:
legoc's avatar
legoc committed
524
	Info(const std::string& name, int id, int pid, State applicationState, State pastApplicationStates, const std::string& args);
legoc's avatar
legoc committed
525
526
527
528
529
530

	int getId() const;
	State getState() const;
	State getPastStates() const;
	const std::string& getArgs() const;
	const std::string& getName() const;
legoc's avatar
legoc committed
531
	int getPid() const;
legoc's avatar
legoc committed
532
533
534

private:
	int m_id;
legoc's avatar
legoc committed
535
	int m_pid;
legoc's avatar
legoc committed
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
	State m_applicationState;
	State m_pastApplicationStates;
	std::string m_processState;
	std::string m_args;
	std::string m_name;
};

std::string toString(cameo::application::State applicationStates);
std::ostream& operator<<(std::ostream&, const cameo::application::This&);
std::ostream& operator<<(std::ostream&, const cameo::application::Instance&);
std::ostream& operator<<(std::ostream&, const cameo::application::Publisher&);
std::ostream& operator<<(std::ostream&, const cameo::application::Subscriber&);
std::ostream& operator<<(std::ostream&, const cameo::application::Request&);
std::ostream& operator<<(std::ostream&, const cameo::application::Responder&);
std::ostream& operator<<(std::ostream&, const cameo::application::Requester&);
std::ostream& operator<<(std::ostream&, const cameo::application::Configuration&);
std::ostream& operator<<(std::ostream&, const cameo::application::Info&);

}
}


#endif