RequesterImpl.cpp 5.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "RequesterImpl.h"
18
19
#include "Application.h"
#include "Serializer.h"
20
21
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
#include "message/Message.h"
23
#include <sstream>
24
#include "JSON.h"
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

using namespace std;

namespace cameo {

const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std::mutex RequesterImpl::m_mutex;
int RequesterImpl::m_requesterCounter = 0;

RequesterImpl::RequesterImpl(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
	m_application(application),
	m_requesterPort(requesterPort),
	m_name(name),
	m_responderId(responderId),
	m_requesterId(requesterId),
	m_canceled(false) {

	stringstream repEndpoint;
	repEndpoint << url << ":" << responderPort;
	m_responderEndpoint = repEndpoint.str();

	// Create the request socket.
	m_requestSocket = m_application->createRequestSocket(m_responderEndpoint);

	// Create a socket REP.
	m_repSocket.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
	stringstream reqEndpoint;
	reqEndpoint << "tcp://*:" << m_requesterPort;

	m_repSocket->bind(reqEndpoint.str().c_str());
}

RequesterImpl::~RequesterImpl() {
	terminate();
}

int RequesterImpl::newRequesterId() {

	lock_guard<mutex> lock(m_mutex);
	m_requesterCounter++;

	return m_requesterCounter;
}

std::string RequesterImpl::getRequesterPortName(const std::string& name, int responderId, int requesterId) {

	stringstream requesterPortName;
	requesterPortName << REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;

	return requesterPortName.str();
}

WaitingImpl * RequesterImpl::waiting() {
	return new GenericWaitingImpl(bind(&RequesterImpl::cancel, this));
}

81
void RequesterImpl::sendBinary(const std::string& requestData) {
82

83
84
85
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
86

87
88
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
89

90
91
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
92

93
	request.pushKey(message::Request::SERVER_URL);
94
	request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
95
96

	request.pushKey(message::Request::SERVER_PORT);
97
	request.pushInt(m_application->getEndpoint().getPort());
98
99
100
101
102

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData);
103
104
}

105
void RequesterImpl::send(const std::string& requestData) {
106
107
108

	// encode the data
	string result;
109
	serialize(requestData, result);
110
111
112
	sendBinary(result);
}

113
114
115
116
117
void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) {

	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
118

119
120
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
121

122
123
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
124

125
	request.pushKey(message::Request::SERVER_URL);
126
	request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
127

128
	request.pushKey(message::Request::SERVER_PORT);
129
	request.pushInt(m_application->getEndpoint().getPort());
130
131
132
133
134

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData1, requestData2);
135
136
}

137
138
139
140
141
std::optional<std::string> RequesterImpl::receiveBinary() {

	if (m_canceled) {
		return {};
	}
142
143
144
145

	unique_ptr<zmq::message_t> message(new zmq::message_t);
	m_repSocket->recv(message.get(), 0);

146
147
148
149
150
	// Get the JSON request.
	json::Object request;
	json::parse(request, message.get());

	int type = request[message::TYPE].GetInt();
151

152
153
154
155
156
157
158
	if (type == message::CANCEL) {
		m_canceled = true;
		return {};
	}

	optional<string> result;

159
160
	if (type == message::RESPONSE) {
		// Get the second part for the message.
161
162
		message.reset(new zmq::message_t);
		m_repSocket->recv(message.get(), 0);
163
		result = string(message->data<char>(), message->size());
164
165
	}

166
	// Create the reply.
167
168
169
	string data = "OK";
	size_t size = data.length();
	unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
170
	memcpy(reply->data(), data.c_str(), size);
171
172
173

	m_repSocket->send(*reply);

174
	return result;
175
176
}

177
178
std::optional<std::string> RequesterImpl::receive() {
	return receiveBinary();
179
180
181
182
}

void RequesterImpl::cancel() {

183
184
185
186
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::CANCEL);

187
	// Create a request socket only for the request.
188
	unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_application->getEndpoint().withPort(m_requesterPort).toString());
189
	requestSocket->request(request.toString());
190
191
192
193
194
195
196
197
198
}

void RequesterImpl::terminate() {

	if (m_repSocket.get() != nullptr) {
		m_repSocket.reset(nullptr);

		bool success = m_application->removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
		if (!success) {
199
			cerr << "Server cannot destroy requester " << m_name << endl;
200
201
202
203
204
205
206
		}
	}

	m_requestSocket.reset();
}

}