Application.h 16.1 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#ifndef CAMEO_APPLICATION_H_
#define CAMEO_APPLICATION_H_

legoc's avatar
legoc committed
20
#include <functional>
legoc's avatar
legoc committed
21
22
23
#include <vector>
#include <set>
#include <memory>
24
#include <optional>
25
#include "InvalidArgumentException.h"
26
#include "UnmanagedApplicationException.h"
27
#include "SocketException.h"
28
29
30
31
#include "ConnectionTimeout.h"
#include "PublisherCreationException.h"
#include "RequesterCreationException.h"
#include "ResponderCreationException.h"
32
33
#include "UndefinedApplicationException.h"
#include "UndefinedKeyException.h"
34
#include "Response.h"
35
#include "Serializer.h"
36
#include "Services.h"
legoc's avatar
legoc committed
37
#include "TimeCondition.h"
38
#include "EventListener.h"
39
#include "KeyValue.h"
legoc's avatar
legoc committed
40
41
42

namespace cameo {

43
44
45
46
/**
 * Option output stream.
 */
const int OUTPUTSTREAM = 1;
legoc's avatar
legoc committed
47
48
49

class Server;
class EventStreamSocket;
legoc's avatar
legoc committed
50
class OutputStreamSocket;
legoc's avatar
legoc committed
51
52
53
54
55
56
57
58
59
class PublisherImpl;
class SubscriberImpl;
class RequestImpl;
class ResponderImpl;
class RequesterImpl;
class WaitingImpl;
class SocketWaitingImpl;
class GenericWaitingImpl;
class WaitingImplSet;
60
class HandlerImpl;
legoc's avatar
legoc committed
61
62
63
64
65
66
67
68
69
70
71
72
73

namespace application {

// forward declarations
class Publisher;
class Subscriber;
class Responder;
class Requester;
class Instance;
class Waiting;

typedef int32_t State;

legoc's avatar
legoc committed
74
75
#undef ERROR

76
77
78
79
80
const State UNKNOWN          = 0;
const State STARTING         = 1;
const State RUNNING          = 2;
const State STOPPING         = 4;
const State KILLING          = 8;
legoc's avatar
legoc committed
81
const State PROCESSING_ERROR = 16;
82
83
84
85
86
87
88
89
90
91
92
const State FAILURE          = 32;
const State SUCCESS          = 64;
const State STOPPED          = 128;
const State KILLED           = 256;

/** \class This
 * \brief class managing the current CAMEO application.
 *
 * \details The application has to be launched by CAMEO command line or another CAMEO app
 * \todo why this does not inherit from the Instance class?
 */
93
class This : private Services, private EventListener {
legoc's avatar
legoc committed
94
95
96
97
98
99
100
101
102
103
104
105
106

	friend class cameo::application::Publisher;
	friend class cameo::application::Responder;
	friend class cameo::application::Requester;
	friend class cameo::PublisherImpl;
	friend class cameo::RequestImpl;
	friend class cameo::ResponderImpl;
	friend class cameo::RequesterImpl;
	friend class cameo::SocketWaitingImpl;
	friend class cameo::GenericWaitingImpl;
	friend class cameo::Server;
	friend std::ostream& operator<<(std::ostream&, const cameo::application::This&);

107
	typedef std::function<void()> StopFunctionType;
legoc's avatar
legoc committed
108
109

public:
110
111
112
113
114
115
116
117
118
119
120
121
	/**
	 * Class defining the Communication Operations Manager (COM).
	 */
	class Com {

		friend class This;

	public:
		void storeKeyValue(const std::string& key, const std::string& value) const;
		std::string getKeyValue(const std::string& key) const;
		void removeKey(const std::string& key) const;

122
123
124
125
		int requestPort() const;
		void setPortUnavailable(int port) const;
		void releasePort(int port) const;

126
	private:
127
		Com(Server* server, int applicationId);
128

129
		Server* m_server;
130
131
132
		int m_applicationId;
	};

133
134
135
	This();
	~This();

136
	static void init(int argc, char* argv[]);
137
	static void init(const std::string& name, const std::string& endpoint);
138
139

	/**
140
141
	 * The terminate call is not necessary unless the static instance of This is not destroyed
	 * automatically.
142
	 */
legoc's avatar
legoc committed
143
144
	static void terminate();

145
146
147
	/// \brief returns the name of the CAMEO application corresponding to this instance
	static const std::string& getName(); 
	static int getId(); ///< returns the ID number of the instance
legoc's avatar
legoc committed
148
149
	static void setTimeout(int timeout);
	static int getTimeout();
150
	static const Endpoint& getEndpoint(); ///< returns the TCP address of this instance
legoc's avatar
legoc committed
151
	static Server& getServer();
152
	static const Com& getCom();
legoc's avatar
legoc committed
153
154
155
156
157
158
159
160

	/**
	 * throws StarterServerException.
	 */
	static Server& getStarterServer();
	static bool isAvailable(int timeout = 10000);
	static bool isStopping();

161
	/**
162
163
	 * Sets the stop handler with stopping time that overrides the one that may be defined in the
	 * configuration of the server.
164
	 */
165
	template <typename Type>
166
167
	static void handleStop(Type function, int stoppingTime = -1) {
		m_instance.handleStopImpl(function, stoppingTime);
legoc's avatar
legoc committed
168
169
170
171
	}

	static void cancelWaitings();

172
	static bool setRunning(); ///< sets the current instance in RUNNING state
legoc's avatar
legoc committed
173
174
175
176
177
178
179
180
181
182

	/**
	 * Sets the result.
	 */
	static void setBinaryResult(const std::string& data);
	static void setResult(const std::string& data);

	/**
	 * Connects to the starter application, i.e. the application which started this application.
	 */
legoc's avatar
legoc committed
183
	static std::unique_ptr<Instance> connectToStarter();
legoc's avatar
legoc committed
184
185

private:
186
	void initApplication(int argc, char* argv[]);
187
	void initApplication(const std::string& name, const std::string& endpoint);
legoc's avatar
legoc committed
188

legoc's avatar
legoc committed
189
190
	static State parseState(const std::string& value);
	State getState(int id) const;
legoc's avatar
legoc committed
191
192
193
194

	int initUnmanagedApplication();
	void terminateUnmanagedApplication();

legoc's avatar
legoc committed
195
196
197
	bool destroyPublisher(const std::string& name) const;
	bool removePort(const std::string& name) const;
	State waitForStop();
198
199

	void stoppingFunction(StopFunctionType stop);
200
	void handleStopImpl(StopFunctionType function, int stoppingTime);
legoc's avatar
legoc committed
201
202
203

	std::string m_name;
	int m_id;
legoc's avatar
legoc committed
204
	bool m_managed;
legoc's avatar
legoc committed
205

legoc's avatar
legoc committed
206
	Endpoint m_starterEndpoint;
legoc's avatar
legoc committed
207
208
209
	std::string m_starterName;
	int m_starterId;

legoc's avatar
legoc committed
210
211
	std::unique_ptr<Server> m_server;
	std::unique_ptr<Server> m_starterServer;
212
	std::unique_ptr<Com> m_com;
legoc's avatar
legoc committed
213

legoc's avatar
legoc committed
214
	std::unique_ptr<WaitingImplSet> m_waitingSet;
215
	std::unique_ptr<HandlerImpl> m_stopHandler;
legoc's avatar
legoc committed
216

217
	static This m_instance;
legoc's avatar
legoc committed
218
219
220
221
	static const std::string RUNNING_STATE;
};


222
class Instance : private EventListener {
legoc's avatar
legoc committed
223
224
225
226
227
228

	friend class cameo::Server;
	friend class cameo::application::Subscriber;
	friend std::ostream& operator<<(std::ostream&, const Instance&);

public:
229
	typedef std::function<void(State)> StateHandlerType;
legoc's avatar
legoc committed
230

231
232
233
234
235
236
237
238
	class Com {

		friend class Instance;

	public:
		std::string getKeyValue(const std::string& key) const;

	private:
239
		Com(Server* server);
240

241
		Server* m_server;
242
243
244
		int m_applicationId;
	};

legoc's avatar
legoc committed
245
246
	~Instance();

legoc's avatar
legoc committed
247
	const std::string& getName() const;
legoc's avatar
legoc committed
248
	int getId() const;
249
	const Endpoint& getEndpoint() const;
legoc's avatar
legoc committed
250
	std::string getNameId() const;
251
252
	const Com& getCom() const;

legoc's avatar
legoc committed
253
254
255
256
257
	bool hasResult() const;
	bool exists() const;
	const std::string& getErrorMessage() const;
	bool stop();
	bool kill();
legoc's avatar
legoc committed
258

259
260
261
262
	State waitFor(int states, StateHandlerType handler);
	State waitFor(int states);
	State waitFor(StateHandlerType handler);
	State waitFor();
263
	State waitFor(const std::string& eventName);
264
	State waitFor(KeyValue& keyValue);
legoc's avatar
legoc committed
265

266
	void cancelWaitFor(); // to unblock another instance
legoc's avatar
legoc committed
267

268
269
270
271
	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
272
273
	State now();

274
275
276
277
278
279
280
281
282
283
	/**
	 * Gets the last state.
	 */
	State getLastState();

	/**
	 * Returns the actual state and UNKNOWN if the instance does not exist anymore.
	 */
	State getActualState() const;

284
285
286
287
288
	/**
	 * Returns the past states.
	 */
	std::set<State> getPastStates() const;

289
290
291
292
293
	/**
	 * Returns the exit code.
	 */
	int getExitCode() const;

legoc's avatar
legoc committed
294
295
296
	bool getBinaryResult(std::string& result);
	bool getResult(std::string& result);

legoc's avatar
legoc committed
297
298
	std::shared_ptr<OutputStreamSocket> getOutputStreamSocket();

legoc's avatar
legoc committed
299
private:
300
	Instance(Server* server);
legoc's avatar
legoc committed
301
302
303

	void setId(int id);
	void setErrorMessage(const std::string& message);
legoc's avatar
legoc committed
304
	void setOutputStreamSocket(std::unique_ptr<OutputStreamSocket>& socket);
legoc's avatar
legoc committed
305
306
	void setPastStates(State pastStates);
	void setInitialState(State state);
307
	State waitFor(int states, const std::string& eventName, KeyValue& keyValue, StateHandlerType handler, bool blocking);
legoc's avatar
legoc committed
308

309
	Server* m_server;
legoc's avatar
legoc committed
310
	std::shared_ptr<OutputStreamSocket> m_outputStreamSocket;
legoc's avatar
legoc committed
311
312
	int m_id;
	std::string m_errorMessage;
313
314
	Com m_com;

legoc's avatar
legoc committed
315
316
317
318
319
	int m_pastStates;
	State m_initialState;
	State m_lastState;
	bool m_hasResult;
	std::string m_resultData;
320
	int m_exitCode;
321
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
322
323
324
325
326
};

///////////////////////////////////////////////////////////////////////////
// InstanceArray

327
typedef std::vector<std::unique_ptr<Instance>> InstanceArray;
legoc's avatar
legoc committed
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343

///////////////////////////////////////////////////////////////////////////
// Publisher

class Publisher {

	friend class cameo::application::This;
	friend std::ostream& operator<<(std::ostream&, const Publisher&);

public:
	~Publisher();

	/**
	 * Returns the publisher with name.
	 * throws PublisherCreationException.
	 */
legoc's avatar
legoc committed
344
	static std::unique_ptr<Publisher> create(const std::string& name, int numberOfSubscribers = 0);
legoc's avatar
legoc committed
345
346
347
348

	const std::string& getName() const;
	const std::string& getApplicationName() const;
	int getApplicationId() const;
349
	std::string getApplicationEndpoint() const;
legoc's avatar
legoc committed
350
351
352
353
354
355
356
357
358

	/**
	 * Returns true if the wait succeeds or false if it was canceled.
	 */
	bool waitForSubscribers() const;
	void cancelWaitForSubscribers();

	void sendBinary(const std::string& data) const;
	void send(const std::string& data) const;
359
	void sendTwoBinaryParts(const std::string& data1, const std::string& data2) const;
legoc's avatar
legoc committed
360
	void sendEnd() const;
361
362
363
364
365

	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
legoc's avatar
legoc committed
366
367
	bool hasEnded() const;

368
369
	bool isEnded() const;

legoc's avatar
legoc committed
370
private:
371
372
	Publisher(application::This* application, int publisherPort, int synchronizerPort,
		  const std::string& name, int numberOfSubscribers);
legoc's avatar
legoc committed
373

legoc's avatar
legoc committed
374
375
	std::unique_ptr<PublisherImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
376
377
378
379
380
381
382
383
384
385
386
387
388
389
};

///////////////////////////////////////////////////////////////////////////
// Subscriber

class Subscriber {

	friend class cameo::Server;
	friend class cameo::application::Instance;
	friend std::ostream& operator<<(std::ostream&, const Subscriber&);

public:
	~Subscriber();

390
	static std::unique_ptr<Subscriber> create(Instance& instance, const std::string& publisherName);
legoc's avatar
legoc committed
391
392
393
394
395
396

	const std::string& getPublisherName() const;
	const std::string& getInstanceName() const;
	int getInstanceId() const;
	const std::string& getInstanceEndpoint() const;

397
398
399
400
	/**
	 * Deprecated.
	 * TODO remove in next version.
	 */
legoc's avatar
legoc committed
401
402
	bool hasEnded() const;

403
404
405
	bool isEnded() const;
	bool isCanceled() const;

legoc's avatar
legoc committed
406
	/**
407
	 * Returns a string or nothing if the stream has finished.
legoc's avatar
legoc committed
408
	 */
409
410
411
412
413
414
415
416
417
418
419
	std::optional<std::string> receiveBinary() const;

	/**
	 * Returns a string or nothing if the stream has finished.
	 */
	std::optional<std::string> receive() const;

	/**
	 * Returns a tuple of strings or nothing if the stream has finished.
	 */
	std::optional<std::tuple<std::string, std::string>> receiveTwoBinaryParts() const;
legoc's avatar
legoc committed
420
421
422
423

	void cancel();

private:
424
425
426
	Subscriber(Server* server, int publisherPort, int synchronizerPort, const std::string& publisherName,
		   int numberOfSubscribers, const std::string& instanceName, int instanceId,
		   const std::string& instanceEndpoint, const std::string& statusEndpoint);
legoc's avatar
legoc committed
427
428
	void init();

legoc's avatar
legoc committed
429
430
	std::unique_ptr<SubscriberImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
431
432
433
434
435
436
437
438
439
440
441
442
443
};

///////////////////////////////////////////////////////////////////////////
// Request

class Request {

	friend class cameo::application::Responder;
	friend std::ostream& operator<<(std::ostream&, const Request&);

public:
	~Request();

legoc's avatar
legoc committed
444
	std::string getObjectId() const;
445
	std::string getRequesterEndpoint() const;
legoc's avatar
legoc committed
446

447
448
449
	const std::string& getBinary() const;
	std::string get() const;
	const std::string& getSecondBinaryPart() const;
legoc's avatar
legoc committed
450

legoc's avatar
legoc committed
451
	void setTimeout(int value);
452

legoc's avatar
legoc committed
453
454
	bool replyBinary(const std::string& response);
	bool reply(const std::string& response);
legoc's avatar
legoc committed
455

legoc's avatar
legoc committed
456
	std::unique_ptr<Instance> connectToRequester();
457

legoc's avatar
legoc committed
458
459
460
	/**
	 * Transfers the ownership of the requester server.
	 */
legoc's avatar
legoc committed
461
	std::unique_ptr<Server> getServer();
legoc's avatar
legoc committed
462

legoc's avatar
legoc committed
463
private:
464
	Request(std::unique_ptr<RequestImpl>& impl);
legoc's avatar
legoc committed
465

legoc's avatar
legoc committed
466
467
	std::unique_ptr<RequestImpl> m_impl;
	std::unique_ptr<Server> m_requesterServer;
legoc's avatar
legoc committed
468
469
470
471
472
473
474
475
476
477
478
479
};

///////////////////////////////////////////////////////////////////////////
// Responder

class Responder {

	friend std::ostream& operator<<(std::ostream&, const Responder&);

public:
	~Responder();

480
	/** \brief Returns the responder with name.
legoc's avatar
legoc committed
481
482
	 * throws ResponderCreationException.
	 */
legoc's avatar
legoc committed
483
	static std::unique_ptr<Responder> create(const std::string& name);
legoc's avatar
legoc committed
484

485
	/// Returns the name of the responder
legoc's avatar
legoc committed
486
487
488
	const std::string& getName() const;

	void cancel();
489

490
	/** \brief Receive a request
491
492
	 * blocking command
	 */
legoc's avatar
legoc committed
493
	std::unique_ptr<Request> receive();
494

495
	/** check if it has been canceled */
496
	bool isCanceled() const;
legoc's avatar
legoc committed
497
498

private:
499
	Responder(application::This* application, int responderPort, const std::string& name);
legoc's avatar
legoc committed
500

legoc's avatar
legoc committed
501
502
	std::unique_ptr<ResponderImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
503
504
505
506
507
508
509
};

///////////////////////////////////////////////////////////////////////////
// Requester

class Requester {

510
	friend std::ostream& operator<<(std::ostream&, const Requester&);
legoc's avatar
legoc committed
511
512
513
514
515
516
517
518

public:
	~Requester();

	/**
	 * Returns the responder with name.
	 * throws RequesterCreationException.
	 */
519
	static std::unique_ptr<Requester> create(Instance& instance, const std::string& name);
legoc's avatar
legoc committed
520
521
522
523
524

	const std::string& getName() const;

	void sendBinary(const std::string& request);
	void send(const std::string& request);
525
	void sendTwoBinaryParts(const std::string& request1, const std::string& request2);
legoc's avatar
legoc committed
526

527
528
529
530
531
532
533
534
535
	/**
	 * Returns a string or nothing if the requester is canceled.
	 */
	std::optional<std::string> receiveBinary();

	/**
	 * Returns a string or nothing if the requester is canceled.
	 */
	std::optional<std::string> receive();
legoc's avatar
legoc committed
536
537
538

	void cancel();

539
540
	bool isCanceled() const;

legoc's avatar
legoc committed
541
private:
542
543
	Requester(application::This* application, const std::string& url, int requesterPort,
		  int responderPort, const std::string& name, int responderId, int requesterId);
legoc's avatar
legoc committed
544

legoc's avatar
legoc committed
545
546
	std::unique_ptr<RequesterImpl> m_impl;
	std::unique_ptr<WaitingImpl> m_waiting;
legoc's avatar
legoc committed
547
548
549
550
551
552
553
554
555
556
};

///////////////////////////////////////////////////////////////////////////
// Configuration

class Configuration {

	friend std::ostream& operator<<(std::ostream&, const Configuration&);

public:
557
558
	Configuration(const std::string& name, const std::string& description, bool singleInfo, bool restart,
		      int startingTime, int stoppingTime);
legoc's avatar
legoc committed
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583

	const std::string& getName() const;
	const std::string& getDescription() const;
	bool hasSingleInstance() const;
	bool canRestart() const;
	int getStartingTime() const;
	int getStoppingTime() const;

private:
	std::string m_name;
	std::string m_description;
	bool m_singleInstance;
	bool m_restart;
	int m_startingTime;
	int m_stoppingTime;
};

///////////////////////////////////////////////////////////////////////////
// Info

class Info {

	friend std::ostream& operator<<(std::ostream&, const Info&);

public:
584
585
	Info(const std::string& name, int id, int pid, State applicationState, State pastApplicationStates,
	     const std::string& args);
legoc's avatar
legoc committed
586
587
588
589
590
591

	int getId() const;
	State getState() const;
	State getPastStates() const;
	const std::string& getArgs() const;
	const std::string& getName() const;
legoc's avatar
legoc committed
592
	int getPid() const;
legoc's avatar
legoc committed
593
594
595

private:
	int m_id;
legoc's avatar
legoc committed
596
	int m_pid;
legoc's avatar
legoc committed
597
598
599
600
601
602
603
	State m_applicationState;
	State m_pastApplicationStates;
	std::string m_processState;
	std::string m_args;
	std::string m_name;
};

604
605
606
607
608
609
610
611
///////////////////////////////////////////////////////////////////////////
// Port

class Port {

	friend std::ostream& operator<<(std::ostream&, const Port&);

public:
legoc's avatar
legoc committed
612
	Port(int port, const std::string& status, const std::string& owner);
613
614
615

	int getPort() const;
	const std::string& getStatus() const;
legoc's avatar
legoc committed
616
	const std::string& getOwner() const;
617
618
619
620

private:
	int m_port;
	std::string m_status;
legoc's avatar
legoc committed
621
	std::string m_owner;
622
623
};

legoc's avatar
legoc committed
624
625
626
627
628
629
630
631
632
633
std::string toString(cameo::application::State applicationStates);
std::ostream& operator<<(std::ostream&, const cameo::application::This&);
std::ostream& operator<<(std::ostream&, const cameo::application::Instance&);
std::ostream& operator<<(std::ostream&, const cameo::application::Publisher&);
std::ostream& operator<<(std::ostream&, const cameo::application::Subscriber&);
std::ostream& operator<<(std::ostream&, const cameo::application::Request&);
std::ostream& operator<<(std::ostream&, const cameo::application::Responder&);
std::ostream& operator<<(std::ostream&, const cameo::application::Requester&);
std::ostream& operator<<(std::ostream&, const cameo::application::Configuration&);
std::ostream& operator<<(std::ostream&, const cameo::application::Info&);
634
std::ostream& operator<<(std::ostream&, const cameo::application::Port&);
legoc's avatar
legoc committed
635

636
637
} // namespace application
} // namespace cameo
legoc's avatar
legoc committed
638
639

#endif