RequesterImpl.cpp 5.64 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "RequesterImpl.h"
18
19
#include "Application.h"
#include "Serializer.h"
20
21
#include "ServicesImpl.h"
#include "RequestSocketImpl.h"
22
#include "message/Message.h"
23
#include <sstream>
24
#include "JSON.h"
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

using namespace std;

namespace cameo {

const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
std::mutex RequesterImpl::m_mutex;
int RequesterImpl::m_requesterCounter = 0;

RequesterImpl::RequesterImpl(application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
	m_application(application),
	m_requesterPort(requesterPort),
	m_name(name),
	m_responderId(responderId),
	m_requesterId(requesterId),
	m_canceled(false) {

	stringstream repEndpoint;
	repEndpoint << url << ":" << responderPort;
	m_responderEndpoint = repEndpoint.str();

	// Create the request socket.
	m_requestSocket = m_application->createRequestSocket(m_responderEndpoint);

	// Create a socket REP.
	m_repSocket.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
	stringstream reqEndpoint;
	reqEndpoint << "tcp://*:" << m_requesterPort;

	m_repSocket->bind(reqEndpoint.str().c_str());
}

RequesterImpl::~RequesterImpl() {
	terminate();
}

int RequesterImpl::newRequesterId() {

	lock_guard<mutex> lock(m_mutex);
	m_requesterCounter++;

	return m_requesterCounter;
}

std::string RequesterImpl::getRequesterPortName(const std::string& name, int responderId, int requesterId) {

	stringstream requesterPortName;
	requesterPortName << REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;

	return requesterPortName.str();
}

WaitingImpl * RequesterImpl::waiting() {
	return new GenericWaitingImpl(bind(&RequesterImpl::cancel, this));
}

81
void RequesterImpl::sendBinary(const std::string& requestData) {
82

83
84
85
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
86

87
88
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
89

90
91
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
92

93
	request.pushKey(message::Request::SERVER_URL);
94
	request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
95
96

	request.pushKey(message::Request::SERVER_PORT);
97
	request.pushInt(m_application->getEndpoint().getPort());
98
99
100
101
102

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData);
103
104
}

105
void RequesterImpl::send(const std::string& requestData) {
106
107
108

	// encode the data
	string result;
109
	serialize(requestData, result);
110
111
112
	sendBinary(result);
}

113
114
115
116
117
void RequesterImpl::sendTwoBinaryParts(const std::string& requestData1, const std::string& requestData2) {

	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::REQUEST);
118

119
120
	request.pushKey(message::Request::APPLICATION_NAME);
	request.pushString(m_application->getName());
121

122
123
	request.pushKey(message::Request::APPLICATION_ID);
	request.pushInt(m_application->getId());
124

125
	request.pushKey(message::Request::SERVER_URL);
126
	request.pushString(m_application->getEndpoint().getProtocol() + "://" + m_application->getEndpoint().getAddress());
127

128
	request.pushKey(message::Request::SERVER_PORT);
129
	request.pushInt(m_application->getEndpoint().getPort());
130
131
132
133
134

	request.pushKey(message::Request::REQUESTER_PORT);
	request.pushInt(m_requesterPort);

	m_requestSocket->request(request.toString(), requestData1, requestData2);
135
136
137
138
139
140
141
}

bool RequesterImpl::receiveBinary(std::string& response) {

	unique_ptr<zmq::message_t> message(new zmq::message_t);
	m_repSocket->recv(message.get(), 0);

142
143
144
145
146
	// Get the JSON request.
	json::Object request;
	json::parse(request, message.get());

	int type = request[message::TYPE].GetInt();
147

148
149
	if (type == message::RESPONSE) {
		// Get the second part for the message.
150
151
		message.reset(new zmq::message_t);
		m_repSocket->recv(message.get(), 0);
152
		response = string(message->data<char>(), message->size());
153
	}
154
	else if (type == message::CANCEL) {
155
156
157
		m_canceled = true;
	}

158
	// Create the reply.
159
160
161
	string data = "OK";
	size_t size = data.length();
	unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
162
	memcpy(reply->data(), data.c_str(), size);
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

	m_repSocket->send(*reply);

	return !m_canceled;
}

bool RequesterImpl::receive(std::string& data) {

	string bytes;
	bool result = receiveBinary(bytes);

	parse(bytes, data);

	return result;
}

void RequesterImpl::cancel() {

181
182
183
184
	json::StringObject request;
	request.pushKey(message::TYPE);
	request.pushInt(message::CANCEL);

185
	// Create a request socket only for the request.
186
	unique_ptr<RequestSocketImpl> requestSocket = m_application->createRequestSocket(m_application->getEndpoint().withPort(m_requesterPort).toString());
187
	requestSocket->request(request.toString());
188
189
190
191
192
193
194
195
196
}

void RequesterImpl::terminate() {

	if (m_repSocket.get() != nullptr) {
		m_repSocket.reset(nullptr);

		bool success = m_application->removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
		if (!success) {
197
			cerr << "Server cannot destroy requester " << m_name << endl;
198
199
200
201
202
203
204
		}
	}

	m_requestSocket.reset();
}

}