EventStreamSocket.cpp 3.45 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "EventStreamSocket.h"
legoc's avatar
legoc committed
18

legoc's avatar
legoc committed
19
#include "ImplFactory.h"
legoc's avatar
legoc committed
20
#include "KeyEvent.h"
21
22
#include "ResultEvent.h"
#include "StatusEvent.h"
legoc's avatar
legoc committed
23
#include "JSON.h"
legoc's avatar
legoc committed
24
#include "Messages.h"
legoc's avatar
legoc committed
25
#include "impl/zmq/EventStreamSocketZmq.h"
legoc's avatar
legoc committed
26
27
28

namespace cameo {

legoc's avatar
legoc committed
29
EventStreamSocket::EventStreamSocket() {
legoc's avatar
legoc committed
30
	m_impl = ImplFactory::createEventStreamSocket();
legoc's avatar
legoc committed
31
32
33
34
35
}

EventStreamSocket::~EventStreamSocket() {
}

legoc's avatar
legoc committed
36
37
38
39
void EventStreamSocket::init(Context * context, const Endpoint& endpoint, RequestSocket * requestSocket) {
	m_impl->init(context, endpoint, requestSocket);
}

legoc's avatar
legoc committed
40
std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {
legoc's avatar
legoc committed
41

legoc's avatar
legoc committed
42
	std::string message(m_impl->receive(blocking));
43
44

	// In case of non-blocking call, the message can be null.
legoc's avatar
legoc committed
45
	if (message == "") {
46
		return std::unique_ptr<Event>(nullptr);
47
	}
legoc's avatar
legoc committed
48

legoc's avatar
legoc committed
49
	if (message == message::Event::STATUS) {
legoc's avatar
legoc committed
50
51
52

		message = m_impl->receive();

legoc's avatar
legoc committed
53
54
		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
55
		json::parse(event, message);
legoc's avatar
legoc committed
56

legoc's avatar
legoc committed
57
		int id = event[message::StatusEvent::ID].GetInt();
58
		std::string name = event[message::StatusEvent::NAME].GetString();
legoc's avatar
legoc committed
59
60
		application::State state = event[message::StatusEvent::APPLICATION_STATE].GetInt();
		application::State pastStates = event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();
legoc's avatar
legoc committed
61

62
		if (event.HasMember(message::StatusEvent::EXIT_CODE)) {
63
			return std::make_unique<StatusEvent>(id, name, state, pastStates, event[message::StatusEvent::EXIT_CODE].GetInt());
64
		}
65
		return std::make_unique<StatusEvent>(id, name, state, pastStates);
legoc's avatar
legoc committed
66
	}
legoc's avatar
legoc committed
67
	else if (message == message::Event::RESULT) {
legoc's avatar
legoc committed
68
69
70

		message = m_impl->receive();

legoc's avatar
legoc committed
71
72
		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
73
		json::parse(event, message);
legoc's avatar
legoc committed
74
75

		int id = event[message::ResultEvent::ID].GetInt();
76
		std::string name = event[message::ResultEvent::NAME].GetString();
legoc's avatar
legoc committed
77

legoc's avatar
legoc committed
78
79
		// Get the data in the next part.
		message = m_impl->receive();
legoc's avatar
legoc committed
80

legoc's avatar
legoc committed
81
		return std::make_unique<ResultEvent>(id, name, message);
legoc's avatar
legoc committed
82
	}
legoc's avatar
legoc committed
83
	else if (message == message::Event::KEYVALUE) {
84
85
86
87
88

		message = m_impl->receive();

		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
89
		json::parse(event, message);
90

91
		int id = event[message::KeyEvent::ID].GetInt();
92
		std::string name = event[message::KeyEvent::NAME].GetString();
93
		long status = event[message::KeyEvent::STATUS].GetInt64();
94
95
		std::string key = event[message::KeyEvent::KEY].GetString();
		std::string value = event[message::KeyEvent::VALUE].GetString();
96

97
		if (status == message::STORE_KEY_VALUE) {
98
			return std::make_unique<KeyEvent>(id, name, KeyEvent::Status::STORED, key, value);
99
100
		}
		else {
101
			return std::make_unique<KeyEvent>(id, name, KeyEvent::Status::REMOVED, key, value);
102
		}
103
	}
legoc's avatar
legoc committed
104
	else if (message == message::Event::CANCEL) {
legoc's avatar
legoc committed
105
106
107
108

		message = m_impl->receive();

		// Exit with a null event.
109
		return std::unique_ptr<Event>(nullptr);
legoc's avatar
legoc committed
110
111
	}

legoc's avatar
legoc committed
112
	std::cerr << "Cannot process '" << message << "' event" << std::endl;
113
	return std::unique_ptr<Event>(nullptr);
legoc's avatar
legoc committed
114
115
116
117
118
119
120
}

void EventStreamSocket::cancel() {
	m_impl->cancel();
}

}