Services.cpp 4.89 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
#include "Services.h"
legoc's avatar
legoc committed
18
19
20
21

#include <iostream>
#include <sstream>
#include <stdexcept>
22
23
#include "impl/CancelIdGenerator.h"
#include "impl/ServicesImpl.h"
24
#include "impl/StreamSocketImpl.h"
25
#include "impl/RequestSocketImpl.h"
26
#include "ProtoType.h"
legoc's avatar
legoc committed
27
28
29
30
31
32
33
34

using namespace std;

namespace cameo {

Services::Services() :
	m_port(0),
	m_statusPort(0),
legoc's avatar
legoc committed
35
	m_impl(nullptr) {
legoc's avatar
legoc committed
36
37
38
39
}

Services::~Services() {
	// Delete impl here to avoid order troubles.
40
41
42
43
	terminate();
}

void Services::terminate() {
44
45
46
47
48

	// Reset the request socket before the impl, otherwise reset impl will block.
	m_requestSocket.reset();

	// Reset the impl.
legoc's avatar
legoc committed
49
	m_impl.reset();
legoc's avatar
legoc committed
50
51
}

52
53
54
void Services::init() {
	// Set the impl.
	m_impl.reset(new ServicesImpl());
legoc's avatar
legoc committed
55
56
}

57
58
59
60
61
void Services::initRequestSocket() {
	// Create the request socket. The server endpoint must have been initialized.
	m_requestSocket = std::move(createRequestSocket(m_serverEndpoint));
}

legoc's avatar
legoc committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
std::vector<std::string> Services::split(const std::string& info) {

	vector<string> result;

	int lastIndex = 0;
	int index = info.find(':');
	while (index != string::npos) {
		result.push_back(info.substr(lastIndex, index - lastIndex));
		lastIndex = index + 1;
		index = info.find(':', lastIndex);
	}
	result.push_back(info.substr(lastIndex));

	return result;
}

void Services::setTimeout(int timeout) {
	m_impl->setTimeout(timeout);
}

int Services::getTimeout() const {
	return m_impl->getTimeout();
}

const std::string& Services::getEndpoint() const {
	return m_serverEndpoint;
}

const std::string& Services::getUrl() const {
	return m_url;
}

int Services::getPort() const {
	return m_port;
}

const std::string& Services::getStatusEndpoint() const {
	return m_serverStatusEndpoint;
}

bool Services::isAvailable(int timeout) const {
103
	return m_impl->isAvailable(m_requestSocket.get(), timeout);
legoc's avatar
legoc committed
104
105
106
107
}

void Services::initStatus() {

108
	// Get the status port.
109
	unique_ptr<zmq::message_t> reply = m_requestSocket->request(m_impl->createRequestType(PROTO_STATUS), m_impl->createShowStatusRequest());
legoc's avatar
legoc committed
110
111
112
113

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());

114
	// Check response.
legoc's avatar
legoc committed
115
116
117
118
	if (requestResponse.value() == -1) {
		return;
	}

119
	// Get the status port.
legoc's avatar
legoc committed
120
121
122
123
124
125
126
	m_statusPort = requestResponse.value();

	stringstream ss;
	ss << m_url << ":" << m_statusPort;
	m_serverStatusEndpoint = ss.str();
}

legoc's avatar
legoc committed
127
std::unique_ptr<EventStreamSocket> Services::openEventStream() {
legoc's avatar
legoc committed
128

legoc's avatar
legoc committed
129
	// Init the status port if necessary.
legoc's avatar
legoc committed
130
131
132
133
134
135
136
137
138
	if (m_statusPort == 0) {
		initStatus();
	}

	stringstream cancelEndpoint;

	// We define a unique name that depends on the event stream socket object because there can be many (instances).
	cancelEndpoint << "inproc://cancel." << CancelIdGenerator::newId();

legoc's avatar
legoc committed
139
	// Create the sockets.
legoc's avatar
legoc committed
140
141
142
	zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint.str());
	zmq::socket_t * subscriber = m_impl->createEventSubscriber(m_serverStatusEndpoint, cancelEndpoint.str());

legoc's avatar
legoc committed
143
144
	// Wait for the connection to be ready.
	m_impl->waitForSubscriber(subscriber, m_requestSocket.get());
legoc's avatar
legoc committed
145

legoc's avatar
legoc committed
146
	// Create the event stream socket.
147
	return unique_ptr<EventStreamSocket>(new EventStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
legoc's avatar
legoc committed
148
149
}

legoc's avatar
legoc committed
150
151
152
153
154
155
std::unique_ptr<OutputStreamSocket> Services::createOutputStreamSocket(int port) {

	if (port == -1) {
		return nullptr;
	}

legoc's avatar
legoc committed
156
	// Prepare our context and subscriber.
legoc's avatar
legoc committed
157
158
	string streamEndpoint = m_url + ":" + to_string(port);

159
160
161
	// We define a unique name that depends on the event stream socket object because there can be many (instances).
	string cancelEndpoint = "inproc://cancel." + to_string(CancelIdGenerator::newId());

legoc's avatar
legoc committed
162
	// Create the sockets.
163
164
165
	zmq::socket_t * cancelPublisher = m_impl->createCancelPublisher(cancelEndpoint);
	zmq::socket_t * subscriber = m_impl->createOutputStreamSubscriber(streamEndpoint, cancelEndpoint);

legoc's avatar
legoc committed
166
	// Create the output stream socket.
167
	return unique_ptr<OutputStreamSocket>(new OutputStreamSocket(new StreamSocketImpl(subscriber, cancelPublisher)));
legoc's avatar
legoc committed
168
169
}

170
171
172
173
std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::string& endpoint) {
	return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), m_impl->m_timeout));
}

174
175
176
177
std::unique_ptr<RequestSocketImpl> Services::createRequestSocket(const std::string& endpoint, int timeout) {
	return unique_ptr<RequestSocketImpl>(new RequestSocketImpl(m_impl->createRequestSocket(endpoint), timeout));
}

legoc's avatar
legoc committed
178
}