Commit ee972f8d authored by Locatelli's avatar Locatelli
Browse files

Multi-threading improvement for offscreen

parent c4bfd4e3
/Makefile.in
......@@ -52,6 +52,7 @@ DATACONT = \
MPLCPP = \
view/mpl/Mpl.cpp \
view/mpl/MplLock.cpp \
view/mpl/MplFigure.cpp \
view/mpl/MplPlot1D.cpp \
view/mpl/MplPlot2D.cpp \
......@@ -104,7 +105,19 @@ mplplot_LDADD = $(LIBS) $(RM_LIBS) \
$(BOOST_THREAD_LIB)
test_SOURCES = \
maintest.cpp
maintest.cpp \
manager/RequestDealerManager.cpp \
manager/ServerSubscriberManager.cpp \
manager/ServerRequesterManager.cpp \
manager/PlotManager.cpp \
manager/OffScreenPlotManager.cpp \
manager/mpl/MplEventManager.cpp \
$(PLOT) \
$(PROTOBUF) \
$(DATACONT) \
$(MPLCPP) \
$(QT) \
$(JSON)
test_CPPFLAGS = $(RM_CXXFLAGS) -DOFFSCREEN -DTRACEDEBUG
test_LDFLAGS = $(RM_LDFLAGS)
......
This diff is collapsed.
......@@ -157,7 +157,7 @@ plot::PlotType PropertyPlotDataContainer::getPlotType() {
if (id != 0) {
try {
size = ServerRequesterManager::getInstance()->getPropertyArraySize(id);
DBGMSG("new Plot Type x size : " << size);
// DBGMSG("new Plot Type x size : " << size);
if (size > 1) {
hasx = true;
break;
......@@ -175,7 +175,7 @@ plot::PlotType PropertyPlotDataContainer::getPlotType() {
if (id != 0) {
try {
size = ServerRequesterManager::getInstance()->getPropertyArraySize(id);
DBGMSG("new Plot Type y size : " << size);
// DBGMSG("new Plot Type y size : " << size);
if (size > 1) {
hasy = true;
break;
......@@ -193,7 +193,7 @@ plot::PlotType PropertyPlotDataContainer::getPlotType() {
if (id != 0) {
try {
size = ServerRequesterManager::getInstance()->getPropertyArraySize(id);
DBGMSG("new Plot Type z size : " << size);
// DBGMSG("new Plot Type z size : " << size);
if (size > 1) {
hasz = true;
break;
......
......@@ -22,8 +22,6 @@
#include <fstream>
#include <cstdlib>
#include <signal.h>
#include <mutex>
#include <condition_variable>
#include <thread>
#include "Common.h"
......@@ -40,9 +38,6 @@ using namespace std;
using namespace cameo;
using namespace manager;
std::mutex conditionMutex; //! Mutex for waiting end of program
std::condition_variable condition; //! Condition for waiting end of program
unique_ptr<application::Instance> nomadserver; //! Instance of nomad server
unique_ptr<application::Subscriber> logsubscriber; //! Subcriber on log publisher of the server
unique_ptr<application::Responder> requesterplot; //! Responder for nomad GUI request
......@@ -56,9 +51,6 @@ void crash_handler(int32 sig) {
DBGMSG("crash_handler: " << sig);
logsubscriber->cancel();
requesterplot->cancel();
// Send finished to main
unique_lock<mutex> lock(conditionMutex);
condition.notify_one();
}
/*!
......@@ -67,6 +59,9 @@ void crash_handler(int32 sig) {
*/
int main(int32 argc, char* argv[]) {
// Need to create Mpl in main thread
view::mpl::Mpl* mpl = new view::mpl::Mpl();
// Init cameo application
int32 err = EXIT_SUCCESS;
application::This::init(1, &argv[2]);
......@@ -117,6 +112,7 @@ int main(int32 argc, char* argv[]) {
// Init the nomad server requester manager
manager::ServerRequesterManager::getInstance()->init(requesterdb.get());
OffScreenPlotManager::getInstance()->init(mpl);
// Start thread waiting and managing log event
thread serverLogSubscriberThread(bind(&OffScreenPlotManager::loop, OffScreenPlotManager::getInstance(), logsubscriber.get()));
......@@ -127,25 +123,21 @@ int main(int32 argc, char* argv[]) {
// Set cameo application running
application::This::setRunning();
// Wait for interruption of program
unique_lock<mutex> lock(conditionMutex);
DBGMSG("wait...");
condition.wait(lock);
// Wait threads
plotRequestThread.join();
serverLogSubscriberThread.join();
// Reset manager
OffScreenPlotManager::resetInstance();
PlotManager::resetInstance();
manager::ServerRequesterManager::resetInstance();
// Wait threads
serverLogSubscriberThread.join();
plotRequestThread.join();
}
exit:
delete mpl;
// Terminate cameo application
DBGMSG("terminate...");
application::This::terminate();
return err;
}
......@@ -125,7 +125,7 @@ int32 main(int32 argc, char* argv[]) {
DBGMSG("datazArray = " << data.zdata_size());
inputMessage.close();
view::mpl::MplFigure* figure = new view::mpl::MplFigure();
view::mpl::MplFigure* figure = new view::mpl::MplFigure(mpl);
ostringstream plotkey;
plotkey << argv[2];
......@@ -133,9 +133,9 @@ int32 main(int32 argc, char* argv[]) {
// 2D
for (int32 i = 0; i < data.xdata_size(); ++i) {
try {
plot::offscreen::OffScreenPlot2D plot(figure, plotkey.str(), mpl, i, &data);
plot::offscreen::OffScreenPlot2D plot(figure, plotkey.str(), mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string());
plot.save(pbfile.parent_path().parent_path().string(), 300, 1);
} catch (Error& e) {
Error("mainoffscreenplot", "Failed create 2d plot", plotkey.str());
}
......@@ -144,18 +144,18 @@ int32 main(int32 argc, char* argv[]) {
// 1D
for (int32 i = 0; i < data.xdata_size(); ++i) {
try {
plot::offscreen::OffScreenPlot1D plot(figure, plotkey.str(), mpl, i, &data);
plot::offscreen::OffScreenPlot1D plot(figure, plotkey.str(), mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string());
plot.save(pbfile.parent_path().parent_path().string(), 300, 1);
} catch (Error& e) {
Error("mainoffscreenplot", "Failed create 1d plot", plotkey.str());
}
}
} else {
try {
plot::offscreen::OffScreenEmptyPlot plot(figure, plotkey.str(), mpl, 0, &data);
plot::offscreen::OffScreenEmptyPlot plot(figure, plotkey.str(), mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string());
plot.save(pbfile.parent_path().parent_path().string(), 300 , 1);
} catch (Error& e) {
Error("mainoffscreenplot", "Failed create empty plot", plotkey.str());
}
......
This diff is collapsed.
......@@ -74,7 +74,7 @@ int main(int argc, char *argv[]) {
// buf << counter++;
// window->set(buf.str());
// {
// view::mpl::MplLock lock();
// view::mpl::MplLock lock(__PRETTY_FUNCTION__);
// x.push_back(x.size() + 1);
// y.push_back(1);
// try {
......
......@@ -31,10 +31,10 @@
#include "protobuf/generated/AcquisitionSerializer.pb.h"
#include "json/LogSender.h"
#include "Trace.h"
#include "view/mpl/MplLock.h"
namespace manager {
OffScreenPlotManager* OffScreenPlotManager::m_Instance = nullptr;
using namespace std;
......@@ -44,15 +44,14 @@ using namespace cameo;
* constructor
*/
OffScreenPlotManager::OffScreenPlotManager() {
m_Mpl = new view::mpl::Mpl();
m_Figure = new view::mpl::MplFigure();
m_Mpl = nullptr;
m_CounterId = 0;
}
/*
* constructor
*/
OffScreenPlotManager::~OffScreenPlotManager() {
delete m_Figure;
delete m_Mpl;
}
......@@ -73,6 +72,10 @@ void OffScreenPlotManager::resetInstance() {
void OffScreenPlotManager::reset() {
}
void OffScreenPlotManager::init(view::mpl::Mpl* mpl) {
m_Mpl = mpl;
}
/*
* getInstance
*/
......@@ -87,6 +90,9 @@ OffScreenPlotManager* OffScreenPlotManager::getInstance() {
* loop
*/
void OffScreenPlotManager::loop(application::Subscriber* subscriber) {
// Enable thread using GIL
view::mpl::MplEnableThreads mplenablethreads;
string data1, data2;
while (subscriber->receiveTwoBinaryParts(data1, data2)) {
if (subscriber->hasEnded() == false) {
......@@ -96,28 +102,60 @@ void OffScreenPlotManager::loop(application::Subscriber* subscriber) {
if (messageType.type() == notification::Message::ImageDataReady) {
notification::ImageDataReady messageImageReady;
messageImageReady.ParseFromString(data2);
DBGMSG("Receive image name -> " << messageImageReady.imagename() << " ,path " << messageImageReady.imagepath());
savePlot(messageImageReady.imagename(), messageImageReady.imagepath());
SavePlot* sp = new SavePlot(m_CounterId, m_Mpl, messageImageReady.imagename(), messageImageReady.imagepath());
thread* td = new thread(bind(&SavePlot::run, sp));
td->detach();
addThreadMapElement(m_CounterId++, td, sp);
}
}
else {
} else {
break;
}
}
}
/*
* addThreadMapElement
*/
void OffScreenPlotManager::addThreadMapElement(uint32 id, std::thread* td, SavePlot* sp) {
std::lock_guard<std::mutex> lock(m_ThreadMapMutex);
m_ThreadMap[id] = make_pair(td, sp);
}
/*
* delThreadMapElement
*/
void OffScreenPlotManager::delThreadMapElement(uint32 id) {
std::lock_guard<std::mutex> lock(m_ThreadMapMutex);
ThreadMap::iterator it = m_ThreadMap.find(id);
if (it != m_ThreadMap.end()) {
delete it->second.first;
delete it->second.second;
m_ThreadMap.erase(id);
}
}
/*
* savePlot
*/
void OffScreenPlotManager::savePlot(string pbfilename, string path) {
SavePlot::SavePlot(uint32 id, view::mpl::Mpl* mpl, const std::string& pbfilename, const std::string& path) {
m_Id = id;
m_Mpl = mpl;
m_Pbfilename = pbfilename;
m_Path = path;
}
boost::filesystem::path pbfile = path;
pbfile /= pbfilename;
/*
* savePlot
*/
void SavePlot::run() {
view::mpl::MplFigure* figure = new view::mpl::MplFigure(m_Mpl, false);
boost::filesystem::path pbfile = m_Path;
pbfile /= m_Pbfilename;
pbfile.replace_extension("pb");
DBGMSG("file : " << pbfile.string().c_str());
ifstream inputMessage(pbfile.string().c_str(), fstream::binary);
if (inputMessage.is_open() == false) {
Error("OffScreenPlotManager", "savePlot", "Failed to open temporary file which contains proto::PlotPropertyDataMessage", pbfilename);
Error("OffScreenPlotManager", "savePlot", "Failed to open temporary file which contains proto::PlotPropertyDataMessage",
m_Pbfilename);
} else {
uint32 size = (uint32) boost::filesystem::file_size(pbfile);
char buffer[size];
......@@ -125,29 +163,33 @@ void OffScreenPlotManager::savePlot(string pbfilename, string path) {
buffer::Data data;
data.ParseFromArray(buffer, size);
DBGMSG("type = " << data.type());
DBGMSG("buffer = " << size);
DBGMSG("numor = " << data.numor());
DBGMSG("dataxArray = " << data.xdata_size());
DBGMSG("datayArray = " << data.ydata_size());
DBGMSG("datazArray = " << data.zdata_size());
// DBGMSG("type = " << data.type());
// DBGMSG("buffer = " << size);
// DBGMSG("numor = " << data.numor());
// DBGMSG("dataxArray = " << data.xdata_size());
// DBGMSG("datayArray = " << data.ydata_size());
// DBGMSG("datazArray = " << data.zdata_size());
inputMessage.close();
ostringstream plotkey;
plotkey << pbfilename;
plotkey << m_Pbfilename;
uint32 dpi = 100;
float64 pad = 1;
if (data.type() == buffer::Data::Spy) {
dpi = 30;
pad = 0;
}
else if (data.type() == buffer::Data::Multiplot) {
dpi = 30;
}
if ((data.xdata_size() > 0) && (data.ydata_size() > 0) && (data.zdata_size() > 0)) {
// 2D
for (int32 i = 0; i < data.xdata_size(); ++i) {
try {
plot::offscreen::OffScreenPlot2D plot(m_Figure, plotkey.str(), m_Mpl, i, &data);
plot::offscreen::OffScreenPlot2D plot(figure, plotkey.str(), m_Mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string(), dpi);
m_Figure->clear();
plot.save(pbfile.parent_path().parent_path().string(), dpi, pad);
} catch (Error& e) {
Error("OffScreenPlotManager", "savePlot", "Failed create 2d plot", plotkey.str());
}
......@@ -156,25 +198,24 @@ void OffScreenPlotManager::savePlot(string pbfilename, string path) {
// 1D
for (int32 i = 0; i < data.xdata_size(); ++i) {
try {
plot::offscreen::OffScreenPlot1D plot(m_Figure, plotkey.str(), m_Mpl, i, &data);
plot::offscreen::OffScreenPlot1D plot(figure, plotkey.str(), m_Mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string(), dpi);
m_Figure->clear();
plot.save(pbfile.parent_path().parent_path().string(), dpi, pad);
} catch (Error& e) {
Error("OffScreenPlotManager", "savePlot", "Failed create 1d plot", plotkey.str());
}
}
} else {
try {
plot::offscreen::OffScreenEmptyPlot plot(m_Figure, plotkey.str(), m_Mpl, 0, &data);
plot::offscreen::OffScreenEmptyPlot plot(figure, plotkey.str(), m_Mpl, &data);
plot.display();
plot.save(pbfile.parent_path().parent_path().string(), dpi);
m_Figure->clear();
plot.save(pbfile.parent_path().parent_path().string(), dpi, pad);
} catch (Error& e) {
Error("OffScreenPlotManager", "savePlot", "Failed create empty plot", plotkey.str());
}
}
if (data.type() == buffer::Data::Log) {
// delete file
boost::filesystem::remove(pbfile);
......@@ -182,16 +223,15 @@ void OffScreenPlotManager::savePlot(string pbfilename, string path) {
// Send image to log database system
json::LogSender sender(&data);
sender.postMessage();
boost::filesystem::path pngfile = path;
boost::filesystem::path pngfile = m_Path;
pngfile = pngfile.parent_path().parent_path();
pngfile /= pbfilename;
DBGMSG("remove -> " << pngfile.string());
pngfile /= m_Pbfilename;
// delete image file
boost::filesystem::remove(pngfile);
}
}
delete figure;
OffScreenPlotManager::getInstance()->delThreadMapElement(m_Id);
}
}
......@@ -20,12 +20,46 @@
#define OFFSCREENPLOTMANAGER_H
#include <cameo/cameo.h>
#include <unordered_map>
#include <thread>
#include <mutex>
#include "view/mpl/Mpl.h"
#include "view/mpl/MplFigure.h"
namespace manager {
/*!
* \brief Save plot object for multithread offscreen plot
*/
class SavePlot {
public:
/*!
* \brief constructor
* \param[in] mpl The mpl object
* \param[in] pbfilename the plot binary file name from server
* \param[in] path the path directory of the pb file
*/
SavePlot(uint32 id, view::mpl::Mpl* mpl, const std::string& pbfilename, const std::string& path);
/*!
* \brief do save action
*/
void run();
private:
uint32 m_Id;
view::mpl::Mpl* m_Mpl; //! Mpl object (main class which managed the matplotib api)
std::string m_Pbfilename; //! the plot binary file name from server
std::string m_Path; //! The path directory of the pb file
};
/*!
* \brief OffScreenPlotManager Manager for offscreen plot like web, spy and multiplot
*/
class OffScreenPlotManager {
public:
......@@ -54,6 +88,14 @@ public:
*/
void savePlot(std::string pbfilename, std::string path);
/*!
* \brief init the OffScreenPlotManager
* \param[in] mpl The mpl object
*/
void init(view::mpl::Mpl* mpl);
void delThreadMapElement(uint32 id);
private:
/*!
......@@ -71,10 +113,16 @@ private:
*/
void reset();
void addThreadMapElement(uint32 id, std::thread* td, SavePlot* sp);
static OffScreenPlotManager* m_Instance; //! Pointer of singleton instance
view::mpl::Mpl* m_Mpl; //! Mpl object (main class which managed the matplotib api)
view::mpl::MplFigure* m_Figure; //! Matplotlib Figure object
typedef std::unordered_map<uint32, std::pair<std::thread*, SavePlot*>> ThreadMap;
ThreadMap m_ThreadMap;
std::mutex m_ThreadMapMutex;
uint32 m_CounterId;
};
}
......
......@@ -88,7 +88,6 @@ void PlotManager::reset() {
}
// Stop all plot processes
for (auto iter : m_PlotsProcesses) {
DBGMSG("plotmanager delete plot : " << iter.first);
iter.second->stop();
}
m_PlotsProcesses.clear();
......@@ -161,7 +160,6 @@ void PlotManager::displayPropertyPlot(const proto::Message& message, const proto
bool newplot = true;
auto iter = m_PlotsProcesses.find(plotMessage.plotkey());
if (iter != m_PlotsProcesses.end()) {
DBGMSG("state : " << iter->second->now());
// Check if exists
switch (iter->second->now()) {
case application::PROCESSING_ERROR:
......@@ -194,11 +192,6 @@ void PlotManager::displayPropertyPlot(const proto::Message& message, const proto
///////////////////////////////////////////////////////////////////////////////////////
//// Start new mplplot process for displaying, pass info to process using temporary files
////
DBGMSG("plotMessage.plotkey() = " << plotMessage.plotkey());
DBGMSG("plotMessage.datax_ids_size() = " << plotMessage.datax_ids_size());
DBGMSG("plotMessage.datay_ids_size() = " << plotMessage.datay_ids_size());
DBGMSG("plotMessage.dataz_ids_size() = " << plotMessage.dataz_ids_size());
string part1;
message.SerializeToString(&part1);
string part2;
......@@ -247,7 +240,6 @@ void PlotManager::displayPropertyPlot(const proto::Message& message, const proto
////
//TODO show in front
// Send signal USR1 to process pid , waiting for the new version of cameo cpp api
DBGMSG("plot already opened : " << plotMessage.plotkey());
for(auto info : application::This::getServer().getApplicationInfos()) {
if (info.getId() == m_PlotsProcesses[plotMessage.plotkey()]->getId()) {
kill(info.getPid(), SIGUSR1);
......
......@@ -53,7 +53,6 @@ void RequestDealerManager::loop(application::Responder* responder) {
string data;
// Loop until cancel is done.
while (true) {
DBGMSG("RequestDealerManager::loop: wait ...");
// Wait for a new request.
unique_ptr<application::Request> request = responder->receive();
......@@ -62,7 +61,6 @@ void RequestDealerManager::loop(application::Responder* responder) {
string firstPart = request->getBinary();
proto::Message message;
message.ParseFromString(firstPart);
DBGMSG("receive : " << message.type());
// Property plot
if (message.type() == proto::Message::PlotPropertyData) {
string secondPart = request->getSecondBinaryPart();
......
......@@ -44,12 +44,20 @@ MplEventManager::MplEventManager() {
m_BypassYLimChanged = false;
}
/*
* constructor
*/
MplEventManager::~MplEventManager() {
MplLock lock(__PRETTY_FUNCTION__);
m_ModuleEvent = bp::object();
}
/*
* init
*/
void MplEventManager::init(view::mpl::Mpl* mpl) {
m_mpl = mpl;
MplLock lock;
MplLock lock(__PRETTY_FUNCTION__);
try {
m_ModuleEvent = bp::import(bp::str(MODULE_EVENT_NAME));
} catch (...) {
......@@ -88,6 +96,7 @@ MplEventManager* MplEventManager::getInstance() {
* subscriberXLimChanged
*/
void MplEventManager::subscribeXLimChanged(bp::object* obj, MplEventSubscriber* subscriber) throw (Error) {
MplLock mpllock(__PRETTY_FUNCTION__);
unique_lock<recursive_mutex> lock(m_XLimChangedMutex);
if (m_XLimChangedSubcribers.find(obj) == m_XLimChangedSubcribers.end()) {
// Connect pick event
......@@ -103,6 +112,7 @@ void MplEventManager::subscribeXLimChanged(bp::object* obj, MplEventSubscriber*
* unsubscriberXLimChanged
*/
void MplEventManager::unsubscribeXLimChanged(bp::object* obj) throw (Error) {
MplLock mpllock(__PRETTY_FUNCTION__);
unique_lock<recursive_mutex> lock(m_XLimChangedMutex);
// remove subscriber from map
if (m_XLimChangedSubcribers.find(obj) != m_XLimChangedSubcribers.end()) {
......@@ -124,7 +134,6 @@ void MplEventManager::xlimChanged(PyObject *self, PyObject* event) {
// Event received, pass it to all subscriber
for (auto pe : m_XLimChangedSubcribers) {
try {
MplLock lock;
pe.second->treatEvent(MplEventSubscriber::XLIM_CHANGED);
} catch (Error& e) {
// Error , do nothing, a event is missed
......@@ -136,6 +145,7 @@ void MplEventManager::xlimChanged(PyObject *self, PyObject* event) {
* subscriberYLimChanged
*/
void MplEventManager::subscribeYLimChanged(bp::object* obj, MplEventSubscriber* subscriber) throw (Error) {
MplLock mpllock(__PRETTY_FUNCTION__);
unique_lock<recursive_mutex> lock(m_YLimChangedMutex);
if (m_YLimChangedSubcribers.find(obj) == m_YLimChangedSubcribers.end()) {
// Connect pick event
......@@ -151,6 +161,7 @@ void MplEventManager::subscribeYLimChanged(bp::object* obj, MplEventSubscriber*
* unsubscriberYLimChanged
*/
void MplEventManager::unsubscribeYLimChanged(bp::object* obj) throw (Error) {
MplLock mpllock(__PRETTY_FUNCTION__);
unique_lock<recursive_mutex> lock(m_YLimChangedMutex);
// remove subscriber from map
if (m_YLimChangedSubcribers.find(obj) != m_YLimChangedSubcribers.end()) {
......@@ -172,7 +183,6 @@ void MplEventManager::ylimChanged(PyObject *self, PyObject* event) {
// Event received, pass it to all subscriber
for (auto pe : m_YLimChangedSubcribers) {
try {
MplLock lock;
pe.second->treatEvent(MplEventSubscriber::YLIM_CHANGED);
} catch (Error& e) {
// Error , do nothing, a event is missed
......
......@@ -134,6 +134,11 @@ private:
*/
MplEventManager();
/*!
* \brief constructor
*/
~MplEventManager();