RequestSocketImpl.cpp 2.79 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "RequestSocketImpl.h"
#include "../ConnectionTimeout.h"
#include <iostream>

21
22
23
#include <chrono>
#include <thread>

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using namespace std;

namespace cameo {

RequestSocketImpl::RequestSocketImpl(zmq::socket_t * socket, int timeout) :
	m_socket(socket), m_timeout(timeout) {
}

RequestSocketImpl::~RequestSocketImpl() {
}

std::unique_ptr<zmq::message_t> RequestSocketImpl::request(const std::string& requestTypePart, const std::string& requestDataPart, int overrideTimeout) {

	// Prepare the request parts.
	int requestTypeSize = requestTypePart.length();
	int requestDataSize = requestDataPart.length();
	zmq::message_t requestType(requestTypeSize);
	zmq::message_t requestData(requestDataSize);
	memcpy(static_cast<void *>(requestType.data()), requestTypePart.c_str(), requestTypeSize);
	memcpy(static_cast<void *>(requestData.data()), requestDataPart.c_str(), requestDataSize);

	// Send the request in two parts.
	m_socket->send(requestType, ZMQ_SNDMORE);
	m_socket->send(requestData);

	int timeout = m_timeout;
	if (overrideTimeout > -1) {
		timeout = overrideTimeout;
	}

	if (timeout > 0) {
		// Polling.
		zmq_pollitem_t items[1];
57
		items[0].socket = static_cast<void *>(*m_socket.get());
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
		items[0].fd = 0;
		items[0].events = ZMQ_POLLIN;
		items[0].revents = 0;

		int rc = zmq::poll(items, 1, timeout);
		if (rc == 0) {
			// Timeout occurred.
			throw ConnectionTimeout();
		}
	}

	// Receive the response.
	unique_ptr<zmq::message_t> reply(new zmq::message_t());
	m_socket->recv(reply.get(), 0);

	return reply;
}

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
void RequestSocketImpl::requestAsync(const std::string& requestTypePart, const std::string& requestDataPart) {

	// Prepare the request parts.
	int requestTypeSize = requestTypePart.length();
	int requestDataSize = requestDataPart.length();
	zmq::message_t requestType(requestTypeSize);
	zmq::message_t requestData(requestDataSize);
	memcpy(static_cast<void *>(requestType.data()), requestTypePart.c_str(), requestTypeSize);
	memcpy(static_cast<void *>(requestData.data()), requestDataPart.c_str(), requestDataSize);

	// Send the request in two parts.
	m_socket->send(requestType, ZMQ_SNDMORE);
	m_socket->send(requestData);

	// ...

	// Close the socket as we do not need to wait for the reply.
	m_socket->close();
}

96
}