OutputStreamSocket.cpp 3.03 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "OutputStreamSocket.h"

legoc's avatar
legoc committed
19
#include "JSON.h"
legoc's avatar
legoc committed
20
#include "Messages.h"
legoc's avatar
legoc committed
21
#include "impl/zmq/OutputStreamSocketZmq.h"
legoc's avatar
legoc committed
22
#include <iostream>
23

legoc's avatar
legoc committed
24
25
namespace cameo {

26
Output::Output() :
27
	m_id(0), m_endOfLine(false) {
legoc's avatar
legoc committed
28
29
30
31
32
33
34
35
36
37
}

int Output::getId() const {
	return m_id;
}

const std::string& Output::getMessage() const {
	return m_message;
}

38
39
40
41
bool Output::isEndOfLine() const {
	return m_endOfLine;
}

legoc's avatar
legoc committed
42
OutputStreamSocket::OutputStreamSocket(Server * server, const std::string& name) :
legoc's avatar
legoc committed
43
	m_applicationId(-1),
44
	m_ended(false),
legoc's avatar
legoc committed
45
46
47
48
	m_canceled(false) {

	//TODO Replace with factory.
	m_impl = std::unique_ptr<EventStreamSocketImpl>(new OutputStreamSocketZmq(server, name));
legoc's avatar
legoc committed
49
50
51
52
53
}

OutputStreamSocket::~OutputStreamSocket() {
}

legoc's avatar
legoc committed
54
55
56
57
void OutputStreamSocket::setApplicationId(int id) {
	m_applicationId = id;
}

58
std::optional<Output> OutputStreamSocket::receive() {
legoc's avatar
legoc committed
59

legoc's avatar
legoc committed
60
61
	// Loop on receive() because in case of configuration multiple=yes, messages can come from different instances.
	while (true) {
legoc's avatar
legoc committed
62
		std::string messageType(m_impl->receive());
legoc's avatar
legoc committed
63

legoc's avatar
legoc committed
64
65
66
		// Cancel can only come from this instance.
		if (messageType == message::Event::CANCEL) {
			m_canceled = true;
67
			return {};
legoc's avatar
legoc committed
68
		}
legoc's avatar
legoc committed
69

legoc's avatar
legoc committed
70
		// Get the second part of the message.
legoc's avatar
legoc committed
71
		std::string message = m_impl->receive();
legoc's avatar
legoc committed
72

legoc's avatar
legoc committed
73
74
75
76
77
		// Continue if type of message is SYNCSTREAM. Theses messages are only used for the poller.
		if (messageType == message::Event::SYNCSTREAM) {
			continue;
		}

legoc's avatar
legoc committed
78
79
		// Get the JSON event.
		json::Object event;
legoc's avatar
legoc committed
80
		json::parse(event, message);
legoc's avatar
legoc committed
81

legoc's avatar
legoc committed
82
		int id = event[message::ApplicationStream::ID].GetInt();
legoc's avatar
legoc committed
83

legoc's avatar
legoc committed
84
85
86
		// Filter on the application id so that only the messages concerning the instance applicationId are processed.
		// Others are ignored.
		if (m_applicationId == -1 || m_applicationId == id) {
legoc's avatar
legoc committed
87

legoc's avatar
legoc committed
88
89
90
			// Terminate the stream if type of message is ENDSTREAM.
			if (messageType == message::Event::ENDSTREAM) {
				m_ended = true;
91
				return {};
legoc's avatar
legoc committed
92
			}
93

legoc's avatar
legoc committed
94
			// Here the type of message is STREAM.
95
			std::string line = event[message::ApplicationStream::MESSAGE].GetString();
legoc's avatar
legoc committed
96
97
			bool endOfLine = event[message::ApplicationStream::EOL].GetBool();

98
			Output output;
legoc's avatar
legoc committed
99
100
101
102
			output.m_id = id;
			output.m_message = line;
			output.m_endOfLine = endOfLine;

103
			return std::optional<Output>(output);
legoc's avatar
legoc committed
104
105
106
107
		}

		// Here, the application id is different from id, then re-iterate.
	}
108
109

	return {};
legoc's avatar
legoc committed
110
111
112
113
114
115
}

void OutputStreamSocket::cancel() {
	m_impl->cancel();
}

116
117
118
119
120
121
122
123
bool OutputStreamSocket::isEnded() const {
	return m_ended;
}

bool OutputStreamSocket::isCanceled() const {
	return m_canceled;
}

legoc's avatar
legoc committed
124
}