RequesterImpl.cpp 5.55 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "RequesterImpl.h"
18
19
#include "Application.h"
#include "Serializer.h"
20
21
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
23
#include "message/JSON.h"
#include "message/Message.h"
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <sstream>

using namespace std;

namespace cameo {

const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std::mutex RequesterImpl::m_mutex;
int RequesterImpl::m_requesterCounter = 0;

RequesterImpl::RequesterImpl(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
	m_application(application),
	m_requesterPort(requesterPort),
	m_name(name),
	m_responderId(responderId),
	m_requesterId(requesterId),
	m_canceled(false) {

	stringstream repEndpoint;
	repEndpoint << url << ":" << responderPort;
	m_responderEndpoint = repEndpoint.str();

	// Create the request socket.
	m_requestSocket = m_application->createRequestSocket(m_responderEndpoint);

	// Create a socket REP.
	m_repSocket.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
	stringstream reqEndpoint;
	reqEndpoint << "tcp://*:" << m_requesterPort;

	m_repSocket->bind(reqEndpoint.str().c_str());
}

RequesterImpl::~RequesterImpl() {
	terminate();
}

int RequesterImpl::newRequesterId() {

	lock_guard<mutex> lock(m_mutex);
	m_requesterCounter++;

	return m_requesterCounter;
}

std::string RequesterImpl::getRequesterPortName(const std::string& name, int responderId, int requesterId) {

	stringstream requesterPortName;
	requesterPortName << REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;

	return requesterPortName.str();
}

WaitingImpl * RequesterImpl::waiting() {
	return new GenericWaitingImpl(bind(&RequesterImpl::cancel, this));
}

81
void RequesterImpl::sendBinary(const std::string& requestData) {
82

83
84
85
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
86

87
88
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
89

90
91
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
92

93
94
95
96
97
98
99
100
101
102
	request.pushKey(message::Request::SERVER_URL);
	request.pushString(m_application->getUrl());

	request.pushKey(message::Request::SERVER_PORT);
	request.pushInt(m_application->getPort());

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData);
103
104
}

105
void RequesterImpl::send(const std::string& requestData) {
106
107
108

	// encode the data
	string result;
109
	serialize(requestData, result);
110
111
112
	sendBinary(result);
}

113
114
115
116
117
void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) {

	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
118

119
120
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
121

122
123
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
124

125
126
	request.pushKey(message::Request::SERVER_URL);
	request.pushString(m_application->getUrl());
127

128
129
130
131
132
133
134
	request.pushKey(message::Request::SERVER_PORT);
	request.pushInt(m_application->getPort());

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData1, requestData2);
135
136
137
138
139
140
141
}

bool RequesterImpl::receiveBinary(std::string& response) {

	unique_ptr<zmq::message_t> message(new zmq::message_t);
	m_repSocket->recv(message.get(), 0);

142
143
144
145
146
	// Get the JSON request.
	json::Object request;
	json::parse(request, message.get());

	int type = request[message::TYPE].GetInt();
147

148
149
	if (type == message::RESPONSE) {
		// Get the second part for the message.
150
151
		message.reset(new zmq::message_t);
		m_repSocket->recv(message.get(), 0);
152
		response = string(message->data<char>(), message->size());
153
	}
154
	else if (type == message::CANCEL) {
155
156
157
		m_canceled = true;
	}

158
	// Create the reply.
159
160
161
	string data = "OK";
	size_t size = data.length();
	unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
162
	memcpy(reply->data(), data.c_str(), size);
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183

	m_repSocket->send(*reply);

	return !m_canceled;
}

bool RequesterImpl::receive(std::string& data) {

	string bytes;
	bool result = receiveBinary(bytes);

	parse(bytes, data);

	return result;
}

void RequesterImpl::cancel() {

	stringstream requesterEndpoint;
	requesterEndpoint << m_application->getUrl() << ":" << m_requesterPort;

184
185
186
187
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::CANCEL);

188
189
	// Create a request socket only for the request.
	unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(requesterEndpoint.str());
190
	requestSocket->request(request.toString());
191
192
193
194
195
196
197
198
199
}

void RequesterImpl::terminate() {

	if (m_repSocket.get() != nullptr) {
		m_repSocket.reset(nullptr);

		bool success = m_application->removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
		if (!success) {
200
			cerr << "Server cannot destroy requester " << m_name << endl;
201
202
203
204
205
206
207
		}
	}

	m_requestSocket.reset();
}

}