EventStreamSocket.cpp 3.83 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "EventStreamSocket.h"

19
#include "JSON.h"
20
21
22
23
24
25
#include "impl/SocketWaitingImpl.h"
#include "PortEvent.h"
#include "PublisherEvent.h"
#include "ResultEvent.h"
#include "StatusEvent.h"
#include "impl/StreamSocketImpl.h"
26
#include "message/Message.h"
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

using namespace std;

namespace cameo {

EventStreamSocket::EventStreamSocket(StreamSocketImpl * impl) : m_impl(impl) {
}

EventStreamSocket::~EventStreamSocket() {
}

std::unique_ptr<Event> EventStreamSocket::receive(bool blocking) {

	unique_ptr<zmq::message_t> message(m_impl->receive(blocking));

	// In case of non-blocking call, the message can be null.
	if (message == nullptr) {
		return unique_ptr<Event>(nullptr);
	}

	string response(static_cast<char*>(message->data()), message->size());

49
	if (response == message::Event::STATUS) {
50
51
52

		message = m_impl->receive();

53
54
55
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
56

57
58
59
60
		int id = event[message::StatusEvent::ID].GetInt();
		string name = event[message::StatusEvent::NAME].GetString();
		application::State state = event[message::StatusEvent::APPLICATION_STATE].GetInt();
		application::State pastStates = event[message::StatusEvent::PAST_APPLICATION_STATES].GetInt();
61

62
63
64
		if (event.HasMember(message::StatusEvent::EXIT_CODE)) {
			return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates, event[message::StatusEvent::EXIT_CODE].GetInt()));
		}
65
66
		return unique_ptr<Event>(new StatusEvent(id, name, state, pastStates));
	}
67
	else if (response == message::Event::RESULT) {
68
69
70

		message = m_impl->receive();

71
72
73
74
75
76
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());

		int id = event[message::ResultEvent::ID].GetInt();
		string name = event[message::ResultEvent::NAME].GetString();
77

78
79
80
		// Get the data in the next part.
		message = m_impl->receive();
		string data(message->data<char>(), message->size());
81

82
83
		return unique_ptr<Event>(new ResultEvent(id, name, data));
	}
84
	else if (response == message::Event::PUBLISHER) {
85
86
87

		message = m_impl->receive();

88
89
90
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
91

92
93
94
		int id = event[message::PublisherEvent::ID].GetInt();
		string name = event[message::PublisherEvent::NAME].GetString();
		string publisherName = event[message::PublisherEvent::PUBLISHER_NAME].GetString();
95

96
97
		return unique_ptr<Event>(new PublisherEvent(id, name, publisherName));
	}
98
	else if (response == message::Event::PORT) {
99
100
101

		message = m_impl->receive();

102
103
104
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
105

106
107
108
		int id = event[message::PortEvent::ID].GetInt();
		string name = event[message::PortEvent::NAME].GetString();
		string portName = event[message::PortEvent::PORT_NAME].GetString();
109

110
111
		return unique_ptr<Event>(new PortEvent(id, name, portName));
	}
112
	else if (response == message::Event::CANCEL) {
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

		message = m_impl->receive();

		// Exit with a null event.
		return unique_ptr<Event>(nullptr);
	}

	cerr << "Cannot process '" << response << "' event" << endl;
	return unique_ptr<Event>(nullptr);
}

void EventStreamSocket::cancel() {
	m_impl->cancel();
}

WaitingImpl * EventStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
130
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
131
132
133
}

}