OutputStreamSocket.cpp 2.09 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "OutputStreamSocket.h"

#include "impl/SocketImpl.h"
#include "impl/SocketWaitingImpl.h"
21
#include "impl/ServicesImpl.h"
legoc's avatar
legoc committed
22
#include "../proto/Messages.pb.h"
23
#include <iostream>
legoc's avatar
legoc committed
24
25
26
27
28

using namespace std;

namespace cameo {

29
30
Output::Output() :
	m_id(0) {
legoc's avatar
legoc committed
31
32
33
34
35
36
37
38
39
40
}

int Output::getId() const {
	return m_id;
}

const std::string& Output::getMessage() const {
	return m_message;
}

41
42
43
OutputStreamSocket::OutputStreamSocket(SocketImpl * impl) :
	m_ended(false),
	m_canceled(false),
legoc's avatar
legoc committed
44
45
46
47
48
49
	m_impl(impl) {
}

OutputStreamSocket::~OutputStreamSocket() {
}

50
bool OutputStreamSocket::receive(Output& output) {
legoc's avatar
legoc committed
51
52
53
54
55

	unique_ptr<zmq::message_t> message(m_impl->receive());

	string response(static_cast<char*>(message->data()), message->size());

56
	if (response == ServicesImpl::STREAM) {
legoc's avatar
legoc committed
57
	}
58
59
60
61
62
63
64
	else if (response == ServicesImpl::ENDSTREAM) {
		m_ended = true;
		return false;
	}
	else if (response == ServicesImpl::CANCEL) {
		m_canceled = true;
		return false;
legoc's avatar
legoc committed
65
66
67
68
69
70
71
	}

	message = m_impl->receive();

	proto::ApplicationStream protoStream;
	protoStream.ParseFromArray(message->data(), message->size());

72
73
74
75
	output.m_id = protoStream.id();
	output.m_message = protoStream.message();

	return true;
legoc's avatar
legoc committed
76
77
78
79
80
81
}

void OutputStreamSocket::cancel() {
	m_impl->cancel();
}

82
83
84
85
86
87
88
89
bool OutputStreamSocket::isEnded() const {
	return m_ended;
}

bool OutputStreamSocket::isCanceled() const {
	return m_canceled;
}

legoc's avatar
legoc committed
90
91
92
93
94
95
WaitingImpl * OutputStreamSocket::waiting() {
	// We transfer the ownership of cancel socket to WaitingImpl
	return new SocketWaitingImpl(m_impl->m_cancelSocket.get(), "CANCEL");
}

}