OutputStreamSocket.cpp 2.38 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "OutputStreamSocket.h"

#include "impl/SocketWaitingImpl.h"
#include "impl/ServicesImpl.h"
#include "impl/StreamSocketImpl.h"
22
23
24
#include "message/JSON.h"
#include "message/Message.h"
#include <iostream>
25
26
27
28
29
30

using namespace std;

namespace cameo {

Output::Output() :
31
	m_id(0), m_endOfLine(false) {
32
33
34
35
36
37
38
39
40
41
}

int Output::getId() const {
	return m_id;
}

const std::string& Output::getMessage() const {
	return m_message;
}

42
43
44
45
bool Output::isEndOfLine() const {
	return m_endOfLine;
}

46
47
48
49
50
51
52
53
54
55
56
57
58
OutputStreamSocket::OutputStreamSocket(StreamSocketImpl * impl) :
	m_ended(false),
	m_canceled(false),
	m_impl(impl) {
}

OutputStreamSocket::~OutputStreamSocket() {
}

bool OutputStreamSocket::receive(Output& output) {

	unique_ptr<zmq::message_t> message(m_impl->receive());

59
	string response(message->data<char>(), message->size());
60

61
	if (response == message::Event::STREAM) {
62
	}
63
	else if (response == message::Event::ENDSTREAM) {
64
65
66
		m_ended = true;
		return false;
	}
67
	else if (response == message::Event::CANCEL) {
68
69
70
71
72
73
		m_canceled = true;
		return false;
	}

	message = m_impl->receive();

74
75
76
77
78
79
	// Get the JSON event.
	json::Object event;
	json::parse(event, message.get());

	int id = event[message::ApplicationStream::ID].GetInt();
	string line = event[message::ApplicationStream::MESSAGE].GetString();
80
	bool endOfLine = event[message::ApplicationStream::EOL].GetBool();
81

82
83
	output.m_id = id;
	output.m_message = line;
84
	output.m_endOfLine = endOfLine;
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

	return true;
}

void OutputStreamSocket::cancel() {
	m_impl->cancel();
}

bool OutputStreamSocket::isEnded() const {
	return m_ended;
}

bool OutputStreamSocket::isCanceled() const {
	return m_canceled;
}

WaitingImpl * OutputStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
103
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), message::Event::CANCEL);
104
105
106
}

}