/* * Nomad Instrument Control Software * * Copyright 2011 Institut Laue-Langevin * * Licensed under the EUPL, Version 1.1 only (the "License"); * You may not use this work except in compliance with the Licence. * You may obtain a copy of the Licence at: * * http://joinup.ec.europa.eu/software/page/eupl * * Unless required by applicable law or agreed to in writing, software * distributed under the Licence is distributed on an "AS IS" basis, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Licence for the specific language governing permissions and * limitations under the Licence. */ #include "RemoteMatlabPub.h" #include #include #include #include #include #include #include #include #include #include #include namespace tas { using namespace std; using namespace common; using namespace boost; using namespace cameo; using boost::property_tree::ptree; const std::string RemoteMatlabPub::TYPE = "tas_remote_matlab_pub"; const std::string RemoteMatlabPub::MATLAB_APPLICATION = "matlabpub"; const std::string RemoteMatlabPub::NUMOR_PUBLISHER = "tas_numor"; const std::string RemoteMatlabPub::IMAGE_PUBLISHER = "tas_image"; const int32 RemoteMatlabPub::LOG_TYPE = 1; const int32 RemoteMatlabPub::LIVE_TYPE = 2; RemoteMatlabPub::RemoteMatlabPub(const std::string& name) : RemoteMatlab(name) { } RemoteMatlabPub::RemoteMatlabPub(const RemoteMatlabPub& controller) : RemoteMatlab(controller) { } RemoteMatlabPub::~RemoteMatlabPub() { // Stop the subscriber m_subscriber->cancel(); // The subscriber thread terminates m_subscriberThread->join(); // Notify the application m_publisher->sendEnd(); // Wait for the termination application::State state = m_matlabApplication->waitFor(); cout << "Matlab application terminated with state " << application::toString(state) << endl; } void RemoteMatlabPub::postConfiguration() { // Do not register the updaters in case of simulation. if (Simulated::activated) { return; } RemoteMatlab::postConfiguration(); registerUpdater(scanController->serializerEvent, &RemoteMatlabPub::updateNumor, this); //registerUpdater(scanController->getCount()->statusMessage, &RemoteMatlabPub::updateCountStatusMessage, this); registerUpdater(countSpy->statusMessage, &RemoteMatlabPub::updateCountStatusMessage, this); if (serverEndpoint() == "") { m_server.reset(new Server(application::This::getServer().getEndpoint())); } else { m_server.reset(new Server(serverEndpoint())); } initialized = initApplication(); } bool RemoteMatlabPub::initApplication() { // Do not initialize if it is already done if (initialized()) { return true; } try { // Start the Matlab server if (!m_server->isAvailable(10000)) { cout << "Matlab server is not available" << endl; return false; } cout << "Matlab server is connected to " << getName() << endl; m_matlabApplication = m_server->connect(MATLAB_APPLICATION); if (m_matlabApplication->exists()) { // The application exists from a previous server session m_matlabApplication->kill(); application::State state = m_matlabApplication->waitFor(); cout << "Terminated matlab application " << state << endl; } m_matlabApplication = m_server->start(MATLAB_APPLICATION); if (!m_matlabApplication->exists()) { cout << "No matlab application" << endl; return false; } // Create the publisher after the start of the application, because it is a blocking call m_publisher = application::Publisher::create(NUMOR_PUBLISHER, 1); cout << "Publisher " << *m_publisher << endl; // Wait for the subscriber bool sync = m_publisher->waitForSubscribers(); // Create the subscriber after the publisher m_subscriber = application::Subscriber::create(*m_matlabApplication, IMAGE_PUBLISHER); cout << "subscriber " << *m_subscriber << endl; // Start the subscriber loop m_subscriberThread.reset(new thread(bind(&RemoteMatlabPub::subscriberLoop, this))); // Application initialized initialized = true; return true; } catch (...) { // Currently an exception can occur during isAvailable. cout << "Unexpected exception during matlab connection" << endl; } return false; } void RemoteMatlabPub::updateNumor() { // Send the numor to be computed if there is something to serialize if (scanController->serializerEvent() && active()) { // Send the numor if (test()) { sendNumor(testNumor(), false); } else { sendNumor(scanController->numor(), false); } } } void RemoteMatlabPub::updateCountStatusMessage() { if ((countSpy->statusMessage() == "Close data") && scanController->getScanInfo()->running() && active()) { cout << "Send the numor for live refresh" << endl; // Send the numor if (test()) { sendNumor(testNumor(), true); } else { sendNumor(scanController->numor(), true); } } } void RemoteMatlabPub::sendNumor(int32 numor, bool live) { if (!initApplication()) { return; } // Get the content of the numor string numorName = lexical_cast(numor); size_t size = numorName.size(); if (size < 6) { string prefix(6 - size, '0'); numorName = prefix + numorName; } // Not log when the calculation is for live process. if (!live) { LogStream logStream = log(Level::s_Info).property(scanController->logAcquisitionType) .property(scanController->logNumor) .property(scanController->logSubtitle); logStream << numorName << " in Q" << image(numor, "q") << endlog; } string numorFileName(common::ServerProperties::getInstance()->getNomadDataPath()); numorFileName += numorName; ifstream numorFile; numorFile.open(numorFileName.c_str()); stringstream stream; stream << numorFile.rdbuf(); // Serialize the request proto::NumorRequest request; if (live) { request.mutable_info()->set_type(proto::NumorInfo_Type_LIVE); } else { request.mutable_info()->set_type(proto::NumorInfo_Type_LOG); } request.mutable_info()->set_propid(getProposalId()); request.mutable_info()->set_proposal(getProposalName()); request.mutable_info()->set_instrid(getInstrumentId()); request.mutable_info()->set_instrument(getInstrumentName()); request.mutable_info()->set_sampid(getSampleId()); request.mutable_info()->set_sample(getSampleName()); request.mutable_info()->set_cycle(getCycleId()); request.mutable_info()->set_imageid(getImageName()); request.mutable_info()->set_numor(lexical_cast(numor)); request.set_content(stream.str()); // Publish the message m_publisher->sendBinary(request.SerializeAsString()); cout << getName() << " sent numor " << numor << " for live ? " << live << endl; } void RemoteMatlabPub::start() { // Send the numor sendNumor(testNumor(), true); } void RemoteMatlabPub::subscriberLoop() { // Loop on events string data; while (m_subscriber->receiveBinary(data)) { proto::NumorResponse response; response.ParseFromString(data); if (response.mutable_info()->type() == proto::NumorInfo_Type_LOG) { sendImageToFluentd(response); } else { setContentFromBinaryData(response.image()); } } } void RemoteMatlabPub::sendImageToFluentd(const proto::NumorResponse& response) { // Encode the JSON message ptree message; message.put("id", Date().toString()); message.put("mt", "acq"); message.put("name", "numor"); message.put("type", "number"); message.put("value", response.info().numor()); message.put("output", "null"); message.put("level", "info"); message.put("propid", response.info().propid()); message.put("instrid", response.info().instrid()); message.put("sampleid", response.info().sampid()); message.put("sample", response.info().sample()); message.put("cycleid", response.info().cycle()); message.put("imagefile", response.info().imageid()); // The binary content of the image is a string and we encode it in base64 message.put("imagedata", Calculations::encodeBase64(response.image())); stringstream os; write_json(os, message, false); std::string instrumentName = response.info().instrument(); boost::to_upper(instrumentName); RestJsonHttpConnection con; try { con.openUrl("localhost", "8888"); // Environment variable if (ServerProperties::getInstance()->getNomadSenddata() == "0") { // If 0 send data to dev database. con.setPath(string("/oracle.forward.") + instrumentName + "_DEV"); } else if (ServerProperties::getInstance()->getNomadSenddata() == "1") { // If 1 send data to prod database. con.setPath(string("/oracle.forward.") + instrumentName); } con.setData(os.str()); boost::property_tree::ptree response = con.send(false); } catch (const Exception& e) { // log to server cerr << e.printStack() << endl; } } }