RemoteMatlabPub.cpp 8.83 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
 * Nomad Instrument Control Software
 *
 * Copyright 2011 Institut Laue-Langevin
 *
 * Licensed under the EUPL, Version 1.1 only (the "License");
 * You may not use this work except in compliance with the Licence.
 * You may obtain a copy of the Licence at:
 *
 * http://joinup.ec.europa.eu/software/page/eupl
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the Licence is distributed on an "AS IS" basis,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Licence for the specific language governing permissions and
 * limitations under the Licence.
 */

#include "RemoteMatlabPub.h"
#include <InstrumentManager/InstrumentManager.h>
#include <controllers/common/acquisition/Count.h>
#include <controllers/common/scanlegacy/ScanInfo.h>
#include <ics/Utilities/Calculations.h>
#include <common/connection/RestJsonHttpConnection.h>
#include <boost/lexical_cast.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <fstream>
#include <iostream>
30
#include <common/base/ServerProperties.h>
31
32
33
34
35
36
37
38
39
40

namespace tas {

using namespace std;
using namespace common;
using namespace boost;
using namespace cameo;
using boost::property_tree::ptree;

const std::string RemoteMatlabPub::TYPE = "tas_remote_matlab_pub";
41
const std::string RemoteMatlabPub::MATLAB_APPLICATION = "matlabpub";
42
43
44
45
46
47
const std::string RemoteMatlabPub::NUMOR_PUBLISHER = "tas_numor";
const std::string RemoteMatlabPub::IMAGE_PUBLISHER = "tas_image";
const int32 RemoteMatlabPub::LOG_TYPE = 1;
const int32 RemoteMatlabPub::LIVE_TYPE = 2;

RemoteMatlabPub::RemoteMatlabPub(const std::string& name) :
48
	RemoteMatlab(name) {
49
50
51
}

RemoteMatlabPub::RemoteMatlabPub(const RemoteMatlabPub& controller) :
52
	RemoteMatlab(controller) {
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
}

RemoteMatlabPub::~RemoteMatlabPub() {

	// Stop the subscriber
	m_subscriber->cancel();

	// The subscriber thread terminates
	m_subscriberThread->join();

	// Notify the application
	m_publisher->sendEnd();

	// Wait for the termination
	application::State state = m_matlabApplication->waitFor();

	cout << "Matlab application terminated with state " << application::toString(state) << endl;
}

void RemoteMatlabPub::postConfiguration() {

74
75
76
77
78
	// Do not register the updaters in case of simulation.
	if (Simulated::activated) {
		return;
	}

79
80
	RemoteMatlab::postConfiguration();

81
82
83
84
85
86
	registerUpdater(scanController->serializerEvent, &RemoteMatlabPub::updateNumor, this);
	//registerUpdater(scanController->getCount()->statusMessage, &RemoteMatlabPub::updateCountStatusMessage, this);
	registerUpdater(countSpy->statusMessage, &RemoteMatlabPub::updateCountStatusMessage, this);

	if (serverEndpoint() == "") {
		m_server.reset(new Server(application::This::getServer().getEndpoint()));
87
88
	}
	else {
89
90
91
92
93
94
95
96
97
98
99
100
101
		m_server.reset(new Server(serverEndpoint()));
	}

	initialized = initApplication();
}

bool RemoteMatlabPub::initApplication() {

	// Do not initialize if it is already done
	if (initialized()) {
		return true;
	}

102
103
	try {
		// Start the Matlab server
104
		if (!m_server->isAvailable(1000)) {
105
106
107
			cout << "Matlab server is not available" << endl;
			return false;
		}
108

109
		cout << "Matlab server is connected to " << getName() << endl;
110

111
112
113
		m_matlabApplication = m_server->connect(MATLAB_APPLICATION);
		if (m_matlabApplication->exists()) {
			// The application exists from a previous server session
114
			m_matlabApplication->stop();
115
116
117
			application::State state = m_matlabApplication->waitFor();
			cout << "Terminated matlab application " << state << endl;
		}
118

119
		m_matlabApplication = m_server->start(MATLAB_APPLICATION);
120

121
122
123
124
125
126
127
128
		if (!m_matlabApplication->exists()) {
			cout << "No matlab application" << endl;
			return false;
		}

		// Create the publisher after the start of the application, because it is a blocking call
		m_publisher = application::Publisher::create(NUMOR_PUBLISHER, 1);
		cout << "Publisher " << *m_publisher << endl;
129

130
131
		// Wait for the subscriber
		bool sync = m_publisher->waitForSubscribers();
132

133
134
135
		// Create the subscriber after the publisher
		m_subscriber = application::Subscriber::create(*m_matlabApplication, IMAGE_PUBLISHER);
		cout << "subscriber " << *m_subscriber << endl;
136

137
138
		// Start the subscriber loop
		m_subscriberThread.reset(new thread(bind(&RemoteMatlabPub::subscriberLoop, this)));
139

140
141
		// Application initialized
		initialized = true;
142

143
		return true;
144
145
	}
	catch (...) {
146
147
148
		// Currently an exception can occur during isAvailable.
		cout << "Unexpected exception during matlab connection" << endl;
	}
149
150

	return false;
151
152
153
154
155
}

void RemoteMatlabPub::updateNumor() {

	// Send the numor to be computed if there is something to serialize
156
157
158
	if (scanController->serializerEvent()
			&& active()) {

159
160
161
		// Send the numor
		if (test()) {
			sendNumor(testNumor(), false);
162
163
		}
		else {
164
165
166
167
168
169
170
			sendNumor(scanController->numor(), false);
		}
	}
}

void RemoteMatlabPub::updateCountStatusMessage() {

171
172
173
	if ((countSpy->statusMessage() == "Close data")
			&& scanController->getScanInfo()->running()
			&& active()) {
174
175
176
177
178
179

		cout << "Send the numor for live refresh" << endl;

		// Send the numor
		if (test()) {
			sendNumor(testNumor(), true);
180
181
		}
		else {
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
			sendNumor(scanController->numor(), true);
		}
	}
}

void RemoteMatlabPub::sendNumor(int32 numor, bool live) {

	if (!initApplication()) {
		return;
	}

	// Get the content of the numor
	string numorName = lexical_cast<string>(numor);
	size_t size = numorName.size();
	if (size < 6) {
		string prefix(6 - size, '0');
		numorName = prefix + numorName;
	}


202
203
204
205
206
	// Not log when the calculation is for live process.
	if (!live) {
		LogStream logStream = log(Level::s_Info).property(scanController->logAcquisitionType)
													.property(scanController->logNumor)
													.property(scanController->logSubtitle);
207

208
209
		logStream << numorName << " in Q" << image(numor, "q") << endlog;
	}
210

211
	string numorFileName(common::ServerProperties::getInstance()->getNomadDataPath());
212
213
214
215
216
217
218
219
220
221
222
223
	numorFileName += numorName;

	ifstream numorFile;
	numorFile.open(numorFileName.c_str());

	stringstream stream;
	stream << numorFile.rdbuf();

	// Serialize the request
	proto::NumorRequest request;
	if (live) {
		request.mutable_info()->set_type(proto::NumorInfo_Type_LIVE);
224
225
	}
	else {
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
		request.mutable_info()->set_type(proto::NumorInfo_Type_LOG);
	}

	request.mutable_info()->set_propid(getProposalId());
	request.mutable_info()->set_proposal(getProposalName());
	request.mutable_info()->set_instrid(getInstrumentId());
	request.mutable_info()->set_instrument(getInstrumentName());
	request.mutable_info()->set_sampid(getSampleId());
	request.mutable_info()->set_sample(getSampleName());
	request.mutable_info()->set_cycle(getCycleId());
	request.mutable_info()->set_imageid(getImageName());
	request.mutable_info()->set_numor(lexical_cast<string>(numor));

	request.set_content(stream.str());

	// Publish the message
	m_publisher->sendBinary(request.SerializeAsString());

244
	cout << getName() << " sent numor " << numor << " for live ? " << live << endl;
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
}

void RemoteMatlabPub::start() {

	// Send the numor
	sendNumor(testNumor(), true);
}

void RemoteMatlabPub::subscriberLoop() {

	// Loop on events
	string data;
	while (m_subscriber->receiveBinary(data)) {

		proto::NumorResponse response;
		response.ParseFromString(data);

		if (response.mutable_info()->type() == proto::NumorInfo_Type_LOG) {
			sendImageToFluentd(response);
264
265
		}
		else {
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
			setContentFromBinaryData(response.image());
		}
	}
}

void RemoteMatlabPub::sendImageToFluentd(const proto::NumorResponse& response) {

	// Encode the JSON message
	ptree message;
	message.put("id", Date().toString());
	message.put("mt", "acq");
	message.put("name", "numor");
	message.put("type", "number");
	message.put("value", response.info().numor());
	message.put("output", "null");
	message.put("level", "info");
	message.put("propid", response.info().propid());
	message.put("instrid", response.info().instrid());
	message.put("sampleid", response.info().sampid());
	message.put("sample", response.info().sample());
	message.put("cycleid", response.info().cycle());
	message.put("imagefile", response.info().imageid());

	// The binary content of the image is a string and we encode it in base64
	message.put("imagedata", Calculations::encodeBase64(response.image()));

	stringstream os;
	write_json(os, message, false);

	std::string instrumentName = response.info().instrument();
	boost::to_upper(instrumentName);

	RestJsonHttpConnection con;

	try {
		con.openUrl("localhost", "8888");

303
304
305
		// Environment variable
		if (ServerProperties::getInstance()->getNomadSenddata() == "0") {
			// If 0 send data to dev database.
306
307
			con.setPath(string("/oracle.forward.") + instrumentName + "_DEV");

308
309
		} else if (ServerProperties::getInstance()->getNomadSenddata() == "1") {
			// If 1 send data to prod database.
310
311
312
313
314
			con.setPath(string("/oracle.forward.") + instrumentName);
		}

		con.setData(os.str());
		boost::property_tree::ptree response = con.send(false);
315
316
	}
	catch (const Exception& e) {
317
318
319
320
321
322
		// log to server
		cerr << e.printStack() << endl;
	}
}

}