Commit d0f27f23 authored by Locatelli's avatar Locatelli

Add survey fluentd part

optional with useHistoDB nomad property
parent bd91188f
......@@ -317,6 +317,7 @@ libnomad_la_SOURCES = \
core/scheduler/SenderCommandStateChangeSubscriber.cpp \
core/scheduler/RequestReply.cpp \
\
core/survey/FluentdSurveyWriter.cpp \
core/survey/Survey.cpp \
core/survey/SurveyElement.cpp \
core/survey/PropertySurveyElement.cpp \
......
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#include "FluentdSurveyWriter.h"
#include "PropertySurveyElement.h"
#include <iostream>
#include <common/base/Date.h>
#include <cstdlib>
#include <dataprovider/database/Database.h>
#include <dataprovider/iconandfamily/ServantIconAndFamilyMaps.h>
#include <common/connection/RestJsonHttpConnection.h>
#include <common/base/Exception.h>
#include <common/base/ServerProperties.h>
#include <common/base/Convert.h>
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <fstream>
#include <unistd.h>
using namespace std;
using namespace common;
using namespace boost;
using namespace rapidjson;
namespace core {
const std::string FluentdSurveyWriter::CYCLE_ID = "cycleid";
const std::string FluentdSurveyWriter::PROPOSAL_ID = "propid";
const std::string FluentdSurveyWriter::CTYPE = "ctype";
const std::string FluentdSurveyWriter::CNAME = "cname";
const std::string FluentdSurveyWriter::PNAME = "pname";
const std::string FluentdSurveyWriter::ALIAS = "alias";
const std::string FluentdSurveyWriter::VALUE = "value";
using namespace common;
FluentdSurveyWriter::SurveyException::SurveyException(const std::string& message) :
common::Exception("dataprovider::FluentdSurveyWriter::SurveyException", message) {
}
FluentdSurveyWriter::FluentdSurveyWriter() :
m_propertyIdOfProposalId(0), m_propertyIdOfInstrumentCode(0), m_propertyIdOfCycleId(0), m_initDone(false), m_showMessages(false) {
m_nomadSendData = ServerProperties::getInstance()->getNomadSenddata();
to_lower(m_nomadSendData);
}
FluentdSurveyWriter::~FluentdSurveyWriter() {
}
void FluentdSurveyWriter::write(SurveyElement* element) {
if (m_nomadSendData == "-1") {
return;
}
if (m_initDone == false) {
try {
init();
} catch (Exception&) {
// cerr << "In Fluentd: Servant Name not correct" << endl;
return;
}
}
m_properties.clear();
PrettyWriter<StringBuffer> writer(s);
// Clean writer and stringbuffer
s.Clear();
// Start new message
writer.StartObject();
try {
if (dynamic_cast<PropertySurveyElement*>(element) != 0) {
createValuesMessageUsingRapidJson(dynamic_cast<PropertySurveyElement*>(element), writer);
}
writer.EndObject();
// only show messages in case it was activated
if (m_showMessages) {
cout << "Fluentd log message: " << s.GetString() << endl << endl;
}
// Trying to detect bad JSON messages (e.g. driver errors with bad characters) using rapidjson::Document.Parse(string).HasParseError()
// does not provide errors.
std::string instrumentCode = dataprovider::Database::getInstance()->getValue<std::string>(m_propertyIdOfInstrumentCode);
boost::to_upper(instrumentCode);
RestJsonHttpConnection con;
con.openUrl("localhost", "8888");
//New environment variable
if (m_nomadSendData == "0") {
//cout << "sending logs to dev database" << endl;
//if 0 send data to dev database
con.setPath(string("/mongo.forward.") + "DEV");
} else if (m_nomadSendData == "1") {
//if 1 send data to prod database
con.setPath(string("/mongo.forward.") + instrumentCode) ;// + instrumentCode);
}
con.setData(s.GetString());
boost::property_tree::ptree response = con.send(false);
} catch (const Exception& e) {
// log to server
cerr << e.printStack() << endl;
slog << e.printStack() << eol;
}
}
void FluentdSurveyWriter::showJSONMessages(bool value) {
m_showMessages = value;
}
void FluentdSurveyWriter::init() {
dataprovider::Database* database = dataprovider::Database::getInstance();
unsigned long servantID = database->getServantID("Title");
std::string controllerType = dataprovider::Database::getInstance()->getControllerType(servantID);
dataprovider::Controller* experimentDataController = dataprovider::Database::getInstance()->getController(controllerType);
//Save ID of all necessary properties:
//It is missing: instrument ID, sample ID, Sample Name.
m_propertyIdOfProposalId = experimentDataController->getPropertyID("proposalId");
m_propertyIdOfInstrumentCode = experimentDataController->getPropertyID("instrumentCode");
m_propertyIdOfCycleId = experimentDataController->getPropertyID("cycleId");
unsigned long readyId = experimentDataController->getPropertyID("ready");
m_initDone = dataprovider::Database::getInstance()->getValue<bool>(readyId);
}
void FluentdSurveyWriter::createValuesMessageUsingRapidJson(PropertySurveyElement* element, rapidjson::PrettyWriter<rapidjson::StringBuffer> &writer) {
writer.Key(CYCLE_ID.c_str());
writer.Uint(m_propertyIdOfCycleId);
writer.Key(PROPOSAL_ID.c_str());
writer.Uint(m_propertyIdOfProposalId);
writer.Key(CTYPE.c_str());
writer.String(element->getServantType().c_str());
writer.Key(CNAME.c_str());
writer.String(element->getServantName().c_str());
writer.Key(PNAME.c_str());
writer.String(element->getPropertyName().c_str());
writer.Key(ALIAS.c_str());
writer.String(element->getAlias().c_str());
writer.Key(VALUE.c_str());
string ptype = element->getPropertyType();
try {
if (ptype == "boolean") {
if ((element->getLastValue() == "1") || (element->getLastValue() == "true") || (element->getLastValue() == "TRUE")) {
writer.Double(1.0);
}
else {
writer.Double(0.0);
}
} else if (ptype == "string") {
writer.Double(0.0);
} else {
writer.Double(to<float64>(element->getLastValue()));
}
} catch (Exception& e) {
writer.Double(0.0);
}
}
}
/*
* Nomad Instrument Control Software
*
* Copyright 2011 Institut Laue-Langevin
*
* Licensed under the EUPL, Version 1.1 only (the "License");
* You may not use this work except in compliance with the Licence.
* You may obtain a copy of the Licence at:
*
* http://joinup.ec.europa.eu/software/page/eupl
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Licence is distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Licence for the specific language governing permissions and
* limitations under the Licence.
*/
#ifndef FLUENTDSURVEYWRITER_H_
#define FLUENTDSURVEYWRITER_H_
#include <common/base/Exception.h>
#include <string>
#include <boost/regex.hpp>
#include <boost/property_tree/ptree.hpp>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/prettywriter.h>
namespace common {
class Date;
}
namespace core {
class SurveyElement;
class PropertySurveyElement;
class FluentdSurveyWriter {
public:
/**
* Survey exception.
*/
class SurveyException : public common::Exception {
public:
/**
* Constructor.
*/
SurveyException(const std::string& message);
};
FluentdSurveyWriter();
~FluentdSurveyWriter();
void write(SurveyElement* element);
void showJSONMessages(bool value);
private:
void init();
void createValuesMessageUsingRapidJson(PropertySurveyElement* element, rapidjson::PrettyWriter<rapidjson::StringBuffer> &writer);
std::string m_nomadSendData;
//Properties IDs
unsigned long m_propertyIdOfProposalId;
unsigned long m_propertyIdOfInstrumentCode;
unsigned long m_propertyIdOfCycleId;
bool m_initDone;
boost::property_tree::ptree m_properties;
rapidjson:: StringBuffer s;
bool m_showMessages;
const static std::string CYCLE_ID;
const static std::string PROPOSAL_ID;
const static std::string CTYPE;
const static std::string CNAME;
const static std::string PNAME;
const static std::string ALIAS;
const static std::string VALUE;
};
}
#endif /* FLUENTDSURVEYWRITER_H_ */
......@@ -38,11 +38,15 @@ const std::string Survey::SURVEY_CYCLE_DURATIONMS = "surveyCycleDurationms";
const std::string Survey::ACTIVE_WRITE_TIMES = "surveyActiveWriteTimes";
const std::string Survey::INACTIVE_WRITE_TIMES = "surveyInactiveWriteTimes";
const std::string Survey::SURVEY_PATH = "surveyPath";
const std::string Survey::USE_HISTO_DB_PATH = "useHistoDB";
Survey::Survey() :
_surveyAccessor(0),
_thread(0),
_running(false) {
_running(false),
_histoDB(false) {
_fluentdwriter.showJSONMessages(true);
}
Survey::~Survey() {
......@@ -88,6 +92,13 @@ void Survey::init(const std::string& configPath, SurveyAccessor * surveyAccessor
_inactiveWriteTimeS = 60;
}
try {
_histoDB = ServerProperties::getInstance()->get<bool>(USE_HISTO_DB_PATH);
} catch (const Exception& e) {
out << "unable to find property 'useHistoDB'" << eol;
_histoDB = false;
}
_surveyAccessor = surveyAccessor;
}
......@@ -218,22 +229,31 @@ void Survey::executeSurvey() {
writeTimeS = _inactiveWriteTimeS;
}
if ((currentDate - element->getDate()).getSeconds() > writeTimeS || element->isFirst() || newFile) {
// Check for change of day for element and append to end of yesterday's file
if (currentDate.toString("%y%m%d") != element->getLastDate().toString("%y%m%d")) {
string isoDateMidnight = element->getDate().toString("%Y%m%d") + "T235959";
boost::posix_time::ptime midnight();
if ((((currentDate - element->getDate()).getSeconds() - (double) writeTimeS) > -0.1) || element->isFirst() || newFile) {
if (!DataAccess::getInstance()->isServantEnabledAndInitialised(0, element->getServantID())) {
element->setDate(currentDate);
}
else {
// Check for change of day for element and append to end of yesterday's file
if (currentDate.toString("%y%m%d") != element->getLastDate().toString("%y%m%d")) {
string isoDateMidnight = element->getDate().toString("%Y%m%d") + "T235959";
boost::posix_time::ptime midnight();
element->setDate(Date(boost::posix_time::from_iso_string(isoDateMidnight)));
xmlMessage += SurveyWriter::getInstance()->write(element, true);
}
element->setDate(Date(boost::posix_time::from_iso_string(isoDateMidnight)));
element->setDate(currentDate);
xmlMessage += SurveyWriter::getInstance()->write(element, true);
}
element->setDate(currentDate);
//out << "writing element " << element->getPropertyName() << eol;
xmlMessage += SurveyWriter::getInstance()->write(element, false);
//out << "writing element " << element->getPropertyName() << eol;
xmlMessage += SurveyWriter::getInstance()->write(element, false);
// Send JSON
if (_histoDB) {
_fluentdwriter.write(element);
}
}
}
}
......@@ -243,6 +263,7 @@ void Survey::executeSurvey() {
_surveyAccessor->notifyXML(xmlMessage);
}
// exiting if instance is stopped
if (!_running) {
return;
......
......@@ -24,6 +24,7 @@
#include <boost/thread/thread.hpp>
#include <string>
#include <set>
#include "FluentdSurveyWriter.h"
namespace core {
......@@ -92,10 +93,13 @@ private:
static Survey* _instance;
SurveyAccessor* _surveyAccessor;
FluentdSurveyWriter _fluentdwriter;
static const std::string SURVEY_CYCLE_DURATIONMS;
static const std::string INACTIVE_WRITE_TIMES;
static const std::string ACTIVE_WRITE_TIMES;
static const std::string SURVEY_PATH;
static const std::string USE_HISTO_DB_PATH;
std::set<PropertySurveyElement*> _allElements;
......@@ -109,6 +113,7 @@ private:
std::string _surveyConfigFileName;
int _activeWriteTimeS;
int _inactiveWriteTimeS;
bool _histoDB;
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment