OutputStreamSocket.cpp 3.19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "OutputStreamSocket.h"

#include "impl/SocketWaitingImpl.h"
#include "impl/ServicesImpl.h"
#include "impl/StreamSocketImpl.h"
22
23
#include "message/Message.h"
#include <iostream>
24
#include "JSON.h"
25
26
27
28
29
30

using namespace std;

namespace cameo {

Output::Output() :
31
	m_id(0), m_endOfLine(false) {
32
33
34
35
36
37
38
39
40
41
}

int Output::getId() const {
	return m_id;
}

const std::string& Output::getMessage() const {
	return m_message;
}

42
43
44
45
bool Output::isEndOfLine() const {
	return m_endOfLine;
}

46
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
47
	m_applicationId(-1),
48
49
50
51
52
53
54
55
	m_ended(false),
	m_canceled(false),
	m_impl(impl) {
}

OutputStreamSocket::~OutputStreamSocket() {
}

56
57
58
59
void OutputStreamSocket::setApplicationId(int id) {
	m_applicationId = id;
}

60
61
bool OutputStreamSocket::receive(Output& output) {

62
63
64
65
	// Loop on receive() because in case of configuration multiple=yes, messages can come from different instances.
	while (true) {
		unique_ptr<zmq::message_t> message(m_impl->receive());
		string messageType(message->data<char>(), message->size());
66

67
68
69
70
71
		// Cancel can only come from this instance.
		if (messageType == message::Event::CANCEL) {
			m_canceled = true;
			return false;
		}
72

73
74
		// Get the second part of the message.
		message = m_impl->receive();
75

legoc's avatar
legoc committed
76
77
78
79
80
		// Continue if type of message is SYNCSTREAM. Theses messages are only used for the poller.
		if (messageType == message::Event::SYNCSTREAM) {
			continue;
		}

81
82
83
		// Get the JSON event.
		json::Object event;
		json::parse(event, message.get());
84

85
		int id = event[message::ApplicationStream::ID].GetInt();
86

87
88
89
		// Filter on the application id so that only the messages concerning the instance applicationId are processed.
		// Others are ignored.
		if (m_applicationId == -1 || m_applicationId == id) {
90

91
92
93
94
95
			// Terminate the stream if type of message is ENDSTREAM.
			if (messageType == message::Event::ENDSTREAM) {
				m_ended = true;
				return false;
			}
96

97
98
99
100
101
102
103
104
105
106
107
108
109
			// Here the type of message is STREAM.
			string line = event[message::ApplicationStream::MESSAGE].GetString();
			bool endOfLine = event[message::ApplicationStream::EOL].GetBool();

			output.m_id = id;
			output.m_message = line;
			output.m_endOfLine = endOfLine;

			return true;
		}

		// Here, the application id is different from id, then re-iterate.
	}
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
}

void OutputStreamSocket::cancel() {
	m_impl->cancel();
}

bool OutputStreamSocket::isEnded() const {
	return m_ended;
}

bool OutputStreamSocket::isCanceled() const {
	return m_canceled;
}

WaitingImpl * OutputStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
126
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
127
128
129
}

}