RequesterImpl.cpp 5.84 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "RequesterImpl.h"
#include "../Application.h"
19
#include "../Serializer.h"
20
#include "ApplicationImpl.h"
legoc's avatar
legoc committed
21
#include <sstream>
legoc's avatar
legoc committed
22
23
24
25
26
27

using namespace std;

namespace cameo {

const std::string RequesterImpl::REQUESTER_PREFIX = "req.";
legoc's avatar
legoc committed
28
std::mutex RequesterImpl::m_mutex;
29
int RequesterImpl::m_requesterCounter = 0;
legoc's avatar
legoc committed
30

31
RequesterImpl::RequesterImpl(const application::This * application, const std::string& url, int requesterPort, int responderPort, const std::string& name, int responderId, int requesterId) :
legoc's avatar
legoc committed
32
33
	m_application(application),
	m_requesterPort(requesterPort),
34
	m_name(name),
35
	m_responderId(responderId),
36
37
	m_requesterId(requesterId),
	m_canceled(false) {
legoc's avatar
legoc committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

	stringstream repEndpoint;
	repEndpoint << url << ":" << responderPort;
	m_responderEndpoint = repEndpoint.str();

	// create a socket REP
	m_requester.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
	stringstream reqEndpoint;
	reqEndpoint << "tcp://*:" << m_requesterPort;

	m_requester->bind(reqEndpoint.str().c_str());
}

RequesterImpl::~RequesterImpl() {
	terminate();
}

55
56
int RequesterImpl::newRequesterId() {

legoc's avatar
legoc committed
57
	lock_guard<mutex> lock(m_mutex);
58
59
60
61
62
63
	m_requesterCounter++;

	return m_requesterCounter;
}

std::string RequesterImpl::getRequesterPortName(const std::string& name, int responderId, int requesterId) {
64
65

	stringstream requesterPortName;
66
	requesterPortName << REQUESTER_PREFIX << name << "." << responderId << "." << requesterId;
67
68
69
70

	return requesterPortName.str();
}

legoc's avatar
legoc committed
71
72
73
74
75
76
77
78
79
80
WaitingImpl * RequesterImpl::waiting() {
	return new GenericWaitingImpl(bind(&RequesterImpl::cancel, this));
}

void RequesterImpl::sendBinary(const std::string& request) {

	string strRequestType = m_application->m_impl->createRequest(PROTO_REQUEST);
	string strRequestData;

	proto::Request requestCommand;
81
	requestCommand.set_applicationname(m_application->getName());
legoc's avatar
legoc committed
82
83
	requestCommand.set_applicationid(m_application->getId());
	requestCommand.set_message(request);
84
85
86
	requestCommand.set_serverurl(m_application->getUrl());
	requestCommand.set_serverport(m_application->getPort());
	requestCommand.set_requesterport(m_requesterPort);
legoc's avatar
legoc committed
87
88
	requestCommand.SerializeToString(&strRequestData);

legoc's avatar
legoc committed
89
	unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
legoc's avatar
legoc committed
90
91
92
93
94
95
96
97
98
99
100
101
102

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}

void RequesterImpl::send(const std::string& request) {

	// encode the data
	string result;
	serialize(request, result);
	sendBinary(result);
}

103
104
105
106
107
108
void RequesterImpl::sendTwoBinaryParts(const std::string& request1, const std::string& request2) {

	string strRequestType = m_application->m_impl->createRequest(PROTO_REQUEST);
	string strRequestData;

	proto::Request requestCommand;
109
	requestCommand.set_applicationname(m_application->getName());
110
111
112
	requestCommand.set_applicationid(m_application->getId());
	requestCommand.set_message(request1);
	requestCommand.set_message2(request2);
113
114
115
	requestCommand.set_serverurl(m_application->getUrl());
	requestCommand.set_serverport(m_application->getPort());
	requestCommand.set_requesterport(m_requesterPort);
116
117
	requestCommand.SerializeToString(&strRequestData);

legoc's avatar
legoc committed
118
	unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, m_responderEndpoint);
119
120
121
122
123

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}

legoc's avatar
legoc committed
124
125
bool RequesterImpl::receiveBinary(std::string& response) {

126
127
	unique_ptr<zmq::message_t> message(new zmq::message_t);
	m_requester->recv(message.get(), 0);
legoc's avatar
legoc committed
128
129
130
131
132
133

	// multi-part message, first part is the type
	proto::MessageType messageType;
	messageType.ParseFromArray((*message).data(), (*message).size());

	if (message->more()) {
134
135
		message.reset(new zmq::message_t);
		m_requester->recv(message.get(), 0);
legoc's avatar
legoc committed
136
137
138

	} else {
		cerr << "unexpected number of frames, should be 2" << endl;
139
		m_canceled = true;
legoc's avatar
legoc committed
140
141
142
143
144
145
	}

	if (messageType.type() == proto::MessageType_Type_RESPONSE) {
		response = string(static_cast<char*>(message->data()), message->size());

	} else if (messageType.type() == proto::MessageType_Type_CANCEL) {
146
		m_canceled = true;
legoc's avatar
legoc committed
147
148
149
150
151
	}

	// Create the reply
	string data = "OK";
	size_t size = data.length();
152
	unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
legoc's avatar
legoc committed
153
154
155
156
	memcpy((void *) reply->data(), data.c_str(), size);

	m_requester->send(*reply);

157
	return !m_canceled;
legoc's avatar
legoc committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
}

bool RequesterImpl::receive(std::string& data) {

	string bytes;
	bool result = receiveBinary(bytes);

	parse(bytes, data);

	return result;
}

void RequesterImpl::cancel() {

	stringstream requesterEndpoint;
	requesterEndpoint << m_application->getUrl() << ":" << m_requesterPort;

	string strRequestType = m_application->m_impl->createRequest(PROTO_CANCEL);
	string strRequestData = "cancel";

legoc's avatar
legoc committed
178
	unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, requesterEndpoint.str());
legoc's avatar
legoc committed
179
180
181
182
183
184
185

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}

void RequesterImpl::terminate() {

legoc's avatar
legoc committed
186
187
	if (m_requester.get() != nullptr) {
		m_requester.reset(nullptr);
legoc's avatar
legoc committed
188

189
		bool success = m_application->removePort(getRequesterPortName(m_name, m_responderId, m_requesterId));
legoc's avatar
legoc committed
190
191
192
193
194
195
196
		if (!success) {
			cerr << "server cannot destroy requester " << m_name << endl;
		}
	}
}

}