EventStreamSocket.cpp 3.33 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "EventStreamSocket.h"
legoc's avatar
legoc committed
18

legoc's avatar
legoc committed
19
#include "ImplFactory.h"
legoc's avatar
legoc committed
20
#include "KeyEvent.h"
21
22
#include "ResultEvent.h"
#include "StatusEvent.h"
legoc's avatar
legoc committed
23
#include "JSON.h"
legoc's avatar
legoc committed
24
#include "Messages.h"
legoc's avatar
legoc committed
25
#include "impl/zmq/EventStreamSocketZmq.h"
legoc's avatar
legoc committed
26
27
28

namespace cameo {

legoc's avatar
legoc committed
29
EventStreamSocket::EventStreamSocket() {
legoc's avatar
legoc committed
30
	m_impl = ImplFactory::createEventStreamSocket();
legoc's avatar
legoc committed
31
32
33
34
35
}

EventStreamSocket::~EventStreamSocket() {
}

legoc's avatar
legoc committed
36
37
38
39
void EventStreamSocket::terminate() {
	m_impl.reset();
}

legoc's avatar
legoc committed
40
41
42
43
void EventStreamSocket::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
	m_impl->init(context, endpoint, requestSocket);
}

legoc's avatar
legoc committed
44
std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
legoc's avatar
legoc committed
45

46
	std::string message {m_impl->receive(blocking)};
47
48

	// In case of non-blocking call, the message can be null.
49
50
	if (message.empty()) {
		return {};
51
	}
legoc's avatar
legoc committed
52

legoc's avatar
legoc committed
53
	if (message == message::Event::STATUS) {
legoc's avatar
legoc committed
54
55
56

		message = m_impl->receive();

legoc's avatar
legoc committed
57
58
		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
59
		json::parse(event, message);
legoc's avatar
legoc committed
60

61
62
63
64
		int id {event[message::StatusEvent::ID].GetInt()};
		std::string name {event[message::StatusEvent::NAME].GetString()};
		State state {event[message::StatusEvent::APPLICATION_STATE].GetInt()};
		State pastStates {event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt()};
legoc's avatar
legoc committed
65

66
		if (event.HasMember(message::StatusEvent::EXIT_CODE)) {
67
			return std::make_unique<StatusEvent>(id, name, state, pastStates, event[message::StatusEvent::EXIT_CODE].GetInt());
68
		}
69
		return std::make_unique<StatusEvent>(id, name, state, pastStates);
legoc's avatar
legoc committed
70
	}
legoc's avatar
legoc committed
71
	else if (message == message::Event::RESULT) {
legoc's avatar
legoc committed
72
73
74

		message = m_impl->receive();

legoc's avatar
legoc committed
75
76
		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
77
		json::parse(event, message);
legoc's avatar
legoc committed
78

79
80
		int id {event[message::ResultEvent::ID].GetInt()};
		std::string name {event[message::ResultEvent::NAME].GetString()};
legoc's avatar
legoc committed
81

legoc's avatar
legoc committed
82
83
		// Get the data in the next part.
		message = m_impl->receive();
legoc's avatar
legoc committed
84

legoc's avatar
legoc committed
85
		return std::make_unique<ResultEvent>(id, name, message);
legoc's avatar
legoc committed
86
	}
legoc's avatar
legoc committed
87
	else if (message == message::Event::KEYVALUE) {
88
89
90
91
92

		message = m_impl->receive();

		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
93
		json::parse(event, message);
94

95
96
		int id {event[message::KeyEvent::ID].GetInt()};
		std::string name {event[message::KeyEvent::NAME].GetString()};
97
		long status {event[message::KeyEvent::STATUS].GetInt()};
98
99
		std::string key {event[message::KeyEvent::KEY].GetString()};
		std::string value {event[message::KeyEvent::VALUE].GetString()};
100

101
		if (status == message::STORE_KEY_VALUE) {
102
			return std::make_unique<KeyEvent>(id, name, KeyEvent::Status::STORED, key, value);
103
104
		}
		else {
105
			return std::make_unique<KeyEvent>(id, name, KeyEvent::Status::REMOVED, key, value);
106
		}
107
	}
legoc's avatar
legoc committed
108
	else if (message == message::Event::CANCEL) {
legoc's avatar
legoc committed
109
110
111
112

		message = m_impl->receive();

		// Exit with a null event.
113
		return {};
legoc's avatar
legoc committed
114
115
	}

116
	return {};
legoc's avatar
legoc committed
117
118
119
120
121
122
123
}

void EventStreamSocket::cancel() {
	m_impl->cancel();
}

}