EventStreamSocket.cpp 4.92 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "EventStreamSocket.h"

19
#include "JSON.h"
20
21
22
23
24
#include "impl/SocketWaitingImpl.h"
#include "PortEvent.h"
#include "PublisherEvent.h"
#include "ResultEvent.h"
#include "StatusEvent.h"
25
#include "StoreKeyValueEvent.h"
26
#include "impl/StreamSocketImpl.h"
27
#include "message/Message.h"
28
#include "RemoveKeyValueEvent.h"
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

using namespace std;

namespace cameo {

EventStreamSocket::EventStreamSocket(StreamSocketImpl * impl) : m_impl(impl) {
}

EventStreamSocket::~EventStreamSocket() {
}

std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {

	unique_ptr<zmq::message_t> message(m_impl->receive(blocking));

	// In case of non-blocking call, the message can be null.
	if (message == nullptr) {
		return unique_ptr<Event>(nullptr);
	}

	string response(static_cast<char*>(message->data()), message->size());

51
	if (response == message::Event::STATUS) {
52
53
54

		message = m_impl->receive();

55
56
57
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
58

59
60
61
62
		int id = event[message::StatusEvent::ID].GetInt();
		string name = event[message::StatusEvent::NAME].GetString();
		application::State state = event[message::StatusEvent::APPLICATION_STATE].GetInt();
		application::State pastStates = event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();
63

64
65
66
		if (event.HasMember(message::StatusEvent::EXIT_CODE)) {
			return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates, event[message::StatusEvent::EXIT_CODE].GetInt()));
		}
67
68
		return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates));
	}
69
	else if (response == message::Event::RESULT) {
70
71
72

		message = m_impl->receive();

73
74
75
76
77
78
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

		int id = event[message::ResultEvent::ID].GetInt();
		string name = event[message::ResultEvent::NAME].GetString();
79

80
81
82
		// Get the data in the next part.
		message = m_impl->receive();
		string data(message->data<char>(), message->size());
83

84
85
		return unique_ptr<Event>(new ResultEvent(id, name, data));
	}
86
	else if (response == message::Event::PUBLISHER) {
87
88
89

		message = m_impl->receive();

90
91
92
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
93

94
95
96
		int id = event[message::PublisherEvent::ID].GetInt();
		string name = event[message::PublisherEvent::NAME].GetString();
		string publisherName = event[message::PublisherEvent::PUBLISHER_NAME].GetString();
97

98
99
		return unique_ptr<Event>(new PublisherEvent(id, name, publisherName));
	}
100
	else if (response == message::Event::PORT) {
101
102
103

		message = m_impl->receive();

104
105
106
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
107

108
109
110
		int id = event[message::PortEvent::ID].GetInt();
		string name = event[message::PortEvent::NAME].GetString();
		string portName = event[message::PortEvent::PORT_NAME].GetString();
111

112
113
		return unique_ptr<Event>(new PortEvent(id, name, portName));
	}
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
	else if (response == message::Event::STOREKEYVALUE) {

		message = m_impl->receive();

		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

		int id = event[message::StoreKeyValueEvent::ID].GetInt();
		string name = event[message::StoreKeyValueEvent::NAME].GetString();
		string key = event[message::StoreKeyValueEvent::KEY].GetString();
		string value = event[message::StoreKeyValueEvent::VALUE].GetString();

		return unique_ptr<Event>(new StoreKeyValueEvent(id, name, key, value));
	}
129
	else if (response == message::Event::REMOVEKEYVALUE) {
130
131
132
133
134
135
136

		message = m_impl->receive();

		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

137
138
139
140
		int id = event[message::RemoveKeyValueEvent::ID].GetInt();
		string name = event[message::RemoveKeyValueEvent::NAME].GetString();
		string key = event[message::RemoveKeyValueEvent::KEY].GetString();
		string value = event[message::RemoveKeyValueEvent::VALUE].GetString();
141

142
		return unique_ptr<Event>(new RemoveKeyValueEvent(id, name, key, value));
143
	}
144
	else if (response == message::Event::CANCEL) {
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

		message = m_impl->receive();

		// Exit with a null event.
		return unique_ptr<Event>(nullptr);
	}

	cerr << "Cannot process '" << response << "' event" << endl;
	return unique_ptr<Event>(nullptr);
}

void EventStreamSocket::cancel() {
	m_impl->cancel();
}

WaitingImpl * EventStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
162
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
163
164
165
}

}