EventStreamSocket.cpp 4.54 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "EventStreamSocket.h"

19
#include "JSON.h"
20
21
22
23
24
#include "impl/SocketWaitingImpl.h"
#include "PortEvent.h"
#include "PublisherEvent.h"
#include "ResultEvent.h"
#include "StatusEvent.h"
25
#include "KeyEvent.h"
26
#include "impl/StreamSocketImpl.h"
27
#include "message/Message.h"
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

using namespace std;

namespace cameo {

EventStreamSocket::EventStreamSocket(StreamSocketImpl * impl) : m_impl(impl) {
}

EventStreamSocket::~EventStreamSocket() {
}

std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {

	unique_ptr<zmq::message_t> message(m_impl->receive(blocking));

	// In case of non-blocking call, the message can be null.
	if (message == nullptr) {
		return unique_ptr<Event>(nullptr);
	}

	string response(static_cast<char*>(message->data()), message->size());

50
	if (response == message::Event::STATUS) {
51
52
53

		message = m_impl->receive();

54
55
56
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
57

58
59
60
61
		int id = event[message::StatusEvent::ID].GetInt();
		string name = event[message::StatusEvent::NAME].GetString();
		application::State state = event[message::StatusEvent::APPLICATION_STATE].GetInt();
		application::State pastStates = event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();
62

63
64
65
		if (event.HasMember(message::StatusEvent::EXIT_CODE)) {
			return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates, event[message::StatusEvent::EXIT_CODE].GetInt()));
		}
66
67
		return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates));
	}
68
	else if (response == message::Event::RESULT) {
69
70
71

		message = m_impl->receive();

72
73
74
75
76
77
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

		int id = event[message::ResultEvent::ID].GetInt();
		string name = event[message::ResultEvent::NAME].GetString();
78

79
80
81
		// Get the data in the next part.
		message = m_impl->receive();
		string data(message->data<char>(), message->size());
82

83
84
		return unique_ptr<Event>(new ResultEvent(id, name, data));
	}
85
	else if (response == message::Event::PUBLISHER) {
86
87
88

		message = m_impl->receive();

89
90
91
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
92

93
94
95
		int id = event[message::PublisherEvent::ID].GetInt();
		string name = event[message::PublisherEvent::NAME].GetString();
		string publisherName = event[message::PublisherEvent::PUBLISHER_NAME].GetString();
96

97
98
		return unique_ptr<Event>(new PublisherEvent(id, name, publisherName));
	}
99
	else if (response == message::Event::PORT) {
100
101
102

		message = m_impl->receive();

103
104
105
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
106

107
108
109
		int id = event[message::PortEvent::ID].GetInt();
		string name = event[message::PortEvent::NAME].GetString();
		string portName = event[message::PortEvent::PORT_NAME].GetString();
110

111
112
		return unique_ptr<Event>(new PortEvent(id, name, portName));
	}
113
	else if (response == message::Event::KEYVALUE) {
114
115
116
117
118
119
120

		message = m_impl->receive();

		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

121
122
123
124
125
		int id = event[message::KeyEvent::ID].GetInt();
		string name = event[message::KeyEvent::NAME].GetString();
		long status = event[message::KeyEvent::STATUS].GetInt64();
		string key = event[message::KeyEvent::KEY].GetString();
		string value = event[message::KeyEvent::VALUE].GetString();
126

127
128
129
130
131
132
		if (status == message::STORE_KEY_VALUE) {
			return unique_ptr<Event>(new KeyEvent(id, name, KeyEvent::Status::STORED, key, value));
		}
		else {
			return unique_ptr<Event>(new KeyEvent(id, name, KeyEvent::Status::REMOVED, key, value));
		}
133
	}
134
	else if (response == message::Event::CANCEL) {
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

		message = m_impl->receive();

		// Exit with a null event.
		return unique_ptr<Event>(nullptr);
	}

	cerr << "Cannot process '" << response << "' event" << endl;
	return unique_ptr<Event>(nullptr);
}

void EventStreamSocket::cancel() {
	m_impl->cancel();
}

WaitingImpl * EventStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
152
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
153
154
155
}

}