ResponderImpl.cpp 3.79 KB
Newer Older
legoc's avatar
legoc committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
 * Copyright 2015 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

17
18
#include "ResponderImpl.h"
#include "../Application.h"
19
#include "../Serializer.h"
20
21
#include "ApplicationImpl.h"
#include "RequestImpl.h"
legoc's avatar
legoc committed
22
#include <sstream>
legoc's avatar
legoc committed
23
24
25
26
27
28
29
30
31
32
33

using namespace std;

namespace cameo {

const std::string ResponderImpl::RESPONDER_PREFIX = "rep.";

ResponderImpl::ResponderImpl(const application::This * application, int responderPort, const std::string& name) :
	m_application(application),
	m_responderPort(responderPort),
	m_name(name),
34
	m_canceled(false) {
legoc's avatar
legoc committed
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

	// create a socket REP
	m_responder.reset(new zmq::socket_t(m_application->m_impl->m_context, ZMQ_REP));
	stringstream repEndpoint;
	repEndpoint << "tcp://*:" << m_responderPort;

	m_responder->bind(repEndpoint.str().c_str());
}

ResponderImpl::~ResponderImpl() {
	terminate();
}

void ResponderImpl::cancel() {

	stringstream endpoint;
	endpoint << m_application->getUrl() << ":" << m_responderPort;

	string strRequestType = m_application->m_impl->createRequest(PROTO_CANCEL);
	string strRequestData = "cancel";

legoc's avatar
legoc committed
56
	unique_ptr<zmq::message_t> reply = m_application->m_impl->tryRequestWithOnePartReply(strRequestType, strRequestData, endpoint.str());
legoc's avatar
legoc committed
57
58
59
60
61
62
63
64
65

	proto::RequestResponse requestResponse;
	requestResponse.ParseFromArray((*reply).data(), (*reply).size());
}

WaitingImpl * ResponderImpl::waiting() {
	return new GenericWaitingImpl(bind(&ResponderImpl::cancel, this));
}

legoc's avatar
legoc committed
66
std::unique_ptr<RequestImpl> ResponderImpl::receive() {
legoc's avatar
legoc committed
67

68
69
	unique_ptr<zmq::message_t> message(new zmq::message_t);
	m_responder->recv(message.get(), 0);
legoc's avatar
legoc committed
70
71
72
73
74
75

	// multi-part message, first part is the type
	proto::MessageType messageType;
	messageType.ParseFromArray((*message).data(), (*message).size());

	if (message->more()) {
76
77
		message.reset(new zmq::message_t);
		m_responder->recv(message.get(), 0);
legoc's avatar
legoc committed
78
79
80

	} else {
		cerr << "unexpected number of frames, should be 2" << endl;
legoc's avatar
legoc committed
81
		return unique_ptr<RequestImpl>(nullptr);
legoc's avatar
legoc committed
82
83
84
85
86
	}

	// Create the reply
	string data = "OK";
	size_t size = data.length();
87
	unique_ptr<zmq::message_t> reply(new zmq::message_t(size));
legoc's avatar
legoc committed
88
89
	memcpy((void *) reply->data(), data.c_str(), size);

legoc's avatar
legoc committed
90
	unique_ptr<RequestImpl> result;
legoc's avatar
legoc committed
91
92
93
94
95
96
97
98

	if (messageType.type() == proto::MessageType_Type_REQUEST) {

		// Parse the message
		proto::Request messageRequest;
		messageRequest.ParseFromArray((*message).data(), (*message).size());

		// Create the request
legoc's avatar
legoc committed
99
		result = unique_ptr<RequestImpl>(new RequestImpl(m_application,
100
101
102
103
104
105
				messageRequest.applicationname(),
				messageRequest.applicationid(),
				messageRequest.message(),
				messageRequest.serverurl(),
				messageRequest.serverport(),
				messageRequest.requesterport()));
legoc's avatar
legoc committed
106

107
108
109
110
111
		// Set message 2 if it exists.
		if (messageRequest.has_message2()) {
			result->m_message2 = messageRequest.message2();
		}

legoc's avatar
legoc committed
112
	} else if (messageType.type() == proto::MessageType_Type_CANCEL) {
113
		m_canceled = true;
legoc's avatar
legoc committed
114
115
116
117
118
119
120

	} else {
		cerr << "unknown message type " << messageType.type() << endl;
		m_responder->send(*message);
	}

	// send to the client
legoc's avatar
legoc committed
121
	if (reply != nullptr) {
legoc's avatar
legoc committed
122
123
124
125
126
127
128
129
		m_responder->send(*reply);
	}

	return result;
}

void ResponderImpl::terminate() {

legoc's avatar
legoc committed
130
131
	if (m_responder.get() != nullptr) {
		m_responder.reset(nullptr);
legoc's avatar
legoc committed
132
133
134
135
136
137
138
139
140

		bool success = m_application->removePort(RESPONDER_PREFIX + m_name);
		if (!success) {
			cerr << "server cannot destroy responder " << m_name << endl;
		}
	}
}

}