Verified Commit 1bb0ab02 authored by Tobias WEBER's avatar Tobias WEBER
Browse files

included nightly updates from https://github.com/t-weber/takin2

parent 3d9d3491
......@@ -134,6 +134,7 @@ SettingsDlg::SettingsDlg(QWidget* pParent, QSettings* pSett)
t_tupSpin("main/max_neighbours", g_iMaxNN, spinMaxNN),
t_tupSpin("main/max_peaks", 10, spinBragg),
t_tupSpin("main/max_threads", g_iMaxThreads, spinThreads),
t_tupSpin("main/max_processes", g_iMaxProcesses, spinProcesses),
t_tupSpin("gl/font_size", 24, spinGLFont),
t_tupSpin("net/poll", 750, spinNetPoll),
};
......@@ -148,6 +149,7 @@ SettingsDlg::SettingsDlg(QWidget* pParent, QSettings* pSett)
spinPrecGen->setMaximum(std::numeric_limits<t_real>::max_digits10);
spinPrecGfx->setMaximum(std::numeric_limits<t_real>::max_digits10);
spinThreads->setMaximum(std::thread::hardware_concurrency());
spinProcesses->setMaximum(std::thread::hardware_concurrency());
SetDefaults(0);
......@@ -309,6 +311,7 @@ void SettingsDlg::SetGlobals() const
g_iPrecGfx = spinPrecGfx->value();
g_iMaxThreads = spinThreads->value();
g_iMaxProcesses = spinProcesses->value();
g_dEps = std::pow(10., -t_real(g_iPrec));
g_dEpsGfx = std::pow(10., -t_real(g_iPrecGfx));
......
......@@ -44,6 +44,7 @@ t_real_glob g_dFontSize = 10.;
// -----------------------------------------------------------------------------
unsigned int g_iMaxThreads = std::thread::hardware_concurrency();
unsigned int g_iMaxProcesses = std::thread::hardware_concurrency();
unsigned int get_max_threads()
{
......@@ -51,6 +52,12 @@ unsigned int get_max_threads()
return std::min(iMaxThreads, g_iMaxThreads);
}
unsigned int get_max_processes()
{
unsigned int iMaxProcesses = std::thread::hardware_concurrency();
return std::min(iMaxProcesses, g_iMaxProcesses);
}
// -----------------------------------------------------------------------------
......
......@@ -23,6 +23,7 @@ extern t_real_glob g_dEps;
extern t_real_glob g_dEpsGfx;
extern unsigned int g_iMaxThreads;
extern unsigned int g_iMaxProcesses;
extern std::size_t GFX_NUM_POINTS;
extern std::size_t g_iMaxNN;
......@@ -54,6 +55,7 @@ extern std::string find_file_in_global_paths(const std::string& strFile, bool bA
extern std::string find_program_binary(const std::string& strExe, bool log_messages=true);
extern unsigned int get_max_threads();
extern unsigned int get_max_processes();
extern std::string get_gpltool_version();
......
......@@ -12,15 +12,20 @@
#include <mutex>
#include <memory>
#include <string>
#include <vector>
#include <unistd.h>
#include <boost/interprocess/ipc/message_queue.hpp>
enum class SqwProcStartMode
{
// run the parent process and fork child processes
START_PARENT_CREATE_CHILD,
// run the parent process, forking a child process
START_PARENT_FORK_CHILD,
// run the child process
START_CHILD
};
......@@ -33,20 +38,22 @@ private:
protected:
mutable std::shared_ptr<std::mutex> m_pmtx;
mutable std::vector<std::shared_ptr<std::mutex>> m_pmtx;
std::string m_strProcName;
pid_t m_pidChild = 0;
std::size_t m_iNumChildProcesses = 1;
std::string m_strProcBaseName;
std::vector<pid_t> m_pidChild;
std::shared_ptr<boost::interprocess::managed_shared_memory> m_pMem;
std::shared_ptr<boost::interprocess::message_queue> m_pmsgIn, m_pmsgOut;
void *m_pSharedPars = nullptr;
std::vector<std::shared_ptr<boost::interprocess::managed_shared_memory>> m_pMem;
std::vector<std::shared_ptr<boost::interprocess::message_queue>> m_pmsgIn, m_pmsgOut;
std::vector<void*> m_pSharedPars;
public:
SqwProc();
SqwProc(const char* pcCfg, SqwProcStartMode mode=SqwProcStartMode::START_PARENT_FORK_CHILD,
const char* pcProcMemName = nullptr, const char* pcProcExecName = nullptr);
const char* pcProcMemName = nullptr, const char* pcProcExecName = nullptr,
unsigned int iNumChildProcesses = 1);
explicit SqwProc(const std::string& strCfg);
virtual ~SqwProc();
......
......@@ -246,7 +246,7 @@ static ProcMsg msg_recv(ipr::message_queue& msgqueue)
// ----------------------------------------------------------------------------
// child process
// child (worker) process
// ----------------------------------------------------------------------------
template<class t_sqw>
......@@ -269,6 +269,7 @@ static void child_proc(ipr::message_queue& msgToParent, ipr::message_queue& msgF
{
ProcMsg msg = msg_recv(msgFromParent);
ProcMsg msgRet;
bool bFailure = 0;
switch(msg.ty)
{
......@@ -306,6 +307,9 @@ static void child_proc(ipr::message_queue& msgToParent, ipr::message_queue& msgF
msgRet.ty = msg.ty;
msgRet.bRet = pSqw->IsOk();
msg_send(msgToParent, msgRet);
if(!msgRet.bRet)
bFailure = 1;
break;
}
case ProcMsgTypes::QUIT:
......@@ -318,6 +322,9 @@ static void child_proc(ipr::message_queue& msgToParent, ipr::message_queue& msgF
break;
}
}
if(bFailure)
break;
}
tl::log_debug("Child process ", getpid(), " message loop has ended.");
......@@ -326,7 +333,7 @@ static void child_proc(ipr::message_queue& msgToParent, ipr::message_queue& msgF
// ----------------------------------------------------------------------------
// parent process
// parent (control) process
// ----------------------------------------------------------------------------
template<class t_sqw>
......@@ -341,95 +348,126 @@ SqwProc<t_sqw>::SqwProc()
*/
template<class t_sqw>
SqwProc<t_sqw>::SqwProc(const char* pcCfg, SqwProcStartMode mode,
const char* pcProcMemName, const char* pcProcExecName)
: m_pmtx(std::make_shared<std::mutex>()),
m_strProcName(tl::rand_name<std::string>(8))
const char* pcProcMemName, const char* pcProcExecName,
unsigned int iNumChildProcesses)
: m_iNumChildProcesses(iNumChildProcesses), m_strProcBaseName(tl::rand_name<std::string>(8))
{
++m_iRefCnt;
// only support one child process when forking
if(mode == SqwProcStartMode::START_PARENT_FORK_CHILD)
m_iNumChildProcesses = 1;
// if a process name is given (e.g. for the child process), use it
if(pcProcMemName)
m_strProcName = pcProcMemName;
m_strProcBaseName = pcProcMemName;
try
{
if(mode == SqwProcStartMode::START_PARENT_CREATE_CHILD || mode == SqwProcStartMode::START_PARENT_FORK_CHILD)
{
tl::log_debug("Creating process memory \"", "takin_sqw_proc_*_", m_strProcName, "\".");
tl::log_debug("Starting ", m_iNumChildProcesses, " child process(es).");
// start all child processes
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::string strProcName = m_strProcBaseName + "_" + tl::var_to_str(iChild);
tl::log_debug("Creating process memory \"", "takin_sqw_proc_*_", strProcName, "\".");
m_pMem = std::make_shared<ipr::managed_shared_memory>(ipr::create_only,
("takin_sqw_proc_mem_" + m_strProcName).c_str(), PARAM_MEM);
m_pSharedPars = static_cast<void*>(m_pMem->construct<t_sh_str>
(("takin_sqw_proc_params_" + m_strProcName).c_str())
(t_sh_str_alloc(m_pMem->get_segment_manager())));
m_pmtx.push_back(std::make_shared<std::mutex>());
m_pmsgIn = std::make_shared<ipr::message_queue>(ipr::create_only,
("takin_sqw_proc_in_" + m_strProcName).c_str(), MSG_QUEUE_SIZE, sizeof(ProcMsg));
m_pmsgOut = std::make_shared<ipr::message_queue>(ipr::create_only,
("takin_sqw_proc_out_" + m_strProcName).c_str(), MSG_QUEUE_SIZE, sizeof(ProcMsg));
m_pMem.push_back(std::make_shared<ipr::managed_shared_memory>(ipr::create_only,
("takin_sqw_proc_mem_" + strProcName).c_str(), PARAM_MEM));
m_pSharedPars.push_back(static_cast<void*>(m_pMem[iChild]->construct<t_sh_str>
(("takin_sqw_proc_params_" + strProcName).c_str())
(t_sh_str_alloc(m_pMem[iChild]->get_segment_manager()))));
if(mode == SqwProcStartMode::START_PARENT_CREATE_CHILD)
{
if(!tl::file_exists(pcProcExecName))
m_pmsgIn.push_back(std::make_shared<ipr::message_queue>(ipr::create_only,
("takin_sqw_proc_in_" + strProcName).c_str(), MSG_QUEUE_SIZE, sizeof(ProcMsg)));
m_pmsgOut.push_back(std::make_shared<ipr::message_queue>(ipr::create_only,
("takin_sqw_proc_out_" + strProcName).c_str(), MSG_QUEUE_SIZE, sizeof(ProcMsg)));
// create a child process
if(mode == SqwProcStartMode::START_PARENT_CREATE_CHILD)
{
tl::log_err("Child process file \"", pcProcExecName, "\" does not exist.");
return;
if(!tl::file_exists(pcProcExecName))
{
tl::log_err("Child process file \"", pcProcExecName, "\" does not exist.");
return;
}
// start child process
if(std::system((std::string(pcProcExecName)
+ std::string(" \"") + pcCfg + std::string("\" ")
+ strProcName + " &").c_str()) < 0)
{
const int errnum = errno;
tl::log_err("Could not create child process \"", pcProcExecName, "\".",
" Error code: ", errnum, ".");
}
}
// start child process
if(std::system((std::string(pcProcExecName)
+ std::string(" \"") + pcCfg + std::string("\" ")
+ m_strProcName + " &").c_str()) < 0)
// fork a child process from the parent process
#ifndef __MINGW32__
else if(mode == SqwProcStartMode::START_PARENT_FORK_CHILD)
{
const int errnum = errno;
tl::log_err("Could not create child process \"", pcProcExecName, "\".",
" Error code: ", errnum, ".");
m_pidChild.push_back(fork());
if(m_pidChild[iChild] < 0)
{
tl::log_err("Cannot fork process.");
return;
}
else if(m_pidChild[iChild] == 0)
{
m_iNumChildProcesses = 1;
m_pidChild[0] = 0;
child_proc<t_sqw>(*m_pmsgIn[iChild], *m_pmsgOut[iChild], pcCfg, m_pSharedPars[iChild]);
exit(0);
return;
}
}
}
#ifndef __MINGW32__
else if(mode == SqwProcStartMode::START_PARENT_FORK_CHILD)
{
m_pidChild = fork();
if(m_pidChild < 0)
#endif
tl::log_debug("Waiting for child process ", iChild, " to become ready...");
ProcMsg msgReady = msg_recv(*m_pmsgIn[iChild]);
if(mode == SqwProcStartMode::START_PARENT_CREATE_CHILD)
m_pidChild.push_back(*((pid_t*)&msgReady.dRet));
m_bOk = msgReady.bRet;
if(!m_bOk)
{
tl::log_err("Cannot fork process.");
return;
tl::log_err("Child process ", m_pidChild[iChild], " reports failure.");
m_iNumChildProcesses = iChild;
break;
}
else if(m_pidChild == 0)
else
{
child_proc<t_sqw>(*m_pmsgIn, *m_pmsgOut, pcCfg, m_pSharedPars);
exit(0);
return;
tl::log_debug("Child process ", m_pidChild[iChild], " is ready.");
}
}
#endif
tl::log_debug("Waiting for client to become ready...");
ProcMsg msgReady = msg_recv(*m_pmsgIn);
if(mode == SqwProcStartMode::START_PARENT_CREATE_CHILD)
m_pidChild = *((pid_t*)&msgReady.dRet);
if(!msgReady.bRet)
tl::log_err("Client ", m_pidChild, " reports failure.");
else
tl::log_debug("Client ", m_pidChild, " is ready.");
m_bOk = msgReady.bRet;
}
// this process has already been started as a child process
else if(mode == SqwProcStartMode::START_CHILD)
{
m_pMem = std::make_shared<ipr::managed_shared_memory>(ipr::open_only,
("takin_sqw_proc_mem_" + m_strProcName).c_str());
m_pSharedPars = static_cast<void*>(m_pMem->find<t_sh_str>
(("takin_sqw_proc_params_" + m_strProcName).c_str()).first);
m_pmsgIn = std::make_shared<ipr::message_queue>(ipr::open_only,
("takin_sqw_proc_in_" + m_strProcName).c_str());
m_pmsgOut = std::make_shared<ipr::message_queue>(ipr::open_only,
("takin_sqw_proc_out_" + m_strProcName).c_str());
m_pidChild = 0;
child_proc<t_sqw>(*m_pmsgIn, *m_pmsgOut, pcCfg, m_pSharedPars);
// for the child process, the vectors have only one element
m_iNumChildProcesses = 1;
const std::string& strProcName = m_strProcBaseName;
m_pMem.push_back(std::make_shared<ipr::managed_shared_memory>(ipr::open_only,
("takin_sqw_proc_mem_" + strProcName).c_str()));
m_pSharedPars.push_back(static_cast<void*>(m_pMem[0]->find<t_sh_str>
(("takin_sqw_proc_params_" + strProcName).c_str()).first));
m_pmsgIn.push_back(std::make_shared<ipr::message_queue>(ipr::open_only,
("takin_sqw_proc_in_" + strProcName).c_str()));
m_pmsgOut.push_back(std::make_shared<ipr::message_queue>(ipr::open_only,
("takin_sqw_proc_out_" + strProcName).c_str()));
m_pidChild.push_back(0);
child_proc<t_sqw>(*m_pmsgIn[0], *m_pmsgOut[0], pcCfg, m_pSharedPars[0]);
}
}
catch(const std::exception& ex)
......@@ -442,7 +480,9 @@ SqwProc<t_sqw>::SqwProc(const char* pcCfg, SqwProcStartMode mode,
template<class t_sqw>
SqwProc<t_sqw>::SqwProc(const std::string& strCfg) : SqwProc<t_sqw>::SqwProc(strCfg.c_str())
{}
{
++m_iRefCnt;
}
/**
......@@ -451,27 +491,39 @@ SqwProc<t_sqw>::SqwProc(const std::string& strCfg) : SqwProc<t_sqw>::SqwProc(str
template<class t_sqw>
SqwProc<t_sqw>::~SqwProc()
{
if(m_pidChild.size() == 0)
{
tl::log_err("No process id registered.");
return;
}
// we're in a child process
if(m_pidChild == 0)
if(m_pidChild[0] == 0)
{
tl::log_debug("Child process ", getpid(), " ending.");
return;
}
// make sure that this instance is the last
if(m_pMem.use_count() > 1)
return;
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
// make sure that this instance is the last
if(m_pMem[iChild].use_count() > 1)
return;
}
// shut down the parent process
try
{
if(m_pmsgOut)
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
ProcMsg msg;
msg.ty = ProcMsgTypes::QUIT;
msg_send(*m_pmsgOut, msg);
if(m_pmsgOut[iChild])
{
ProcMsg msg;
msg.ty = ProcMsgTypes::QUIT;
msg_send(*m_pmsgOut[iChild], msg);
//kill(m_pidChild, SIGABRT);
//kill(m_pidChild[iChild], SIGABRT);
}
}
if(--m_iRefCnt == 0)
......@@ -479,18 +531,24 @@ SqwProc<t_sqw>::~SqwProc()
// give clients time to end before removing the shared memory
std::this_thread::sleep_for(std::chrono::milliseconds{200});
ipr::message_queue::remove(("takin_sqw_proc_in_" + m_strProcName).c_str());
ipr::message_queue::remove(("takin_sqw_proc_out_" + m_strProcName).c_str());
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::string strProcName = m_strProcBaseName + "_" + tl::var_to_str(iChild);
ipr::message_queue::remove(("takin_sqw_proc_in_" + strProcName).c_str());
ipr::message_queue::remove(("takin_sqw_proc_out_" + strProcName).c_str());
m_pMem->destroy<t_sh_str>(("takin_sqw_proc_params_" + m_strProcName).c_str());
ipr::shared_memory_object::remove(("takin_sqw_proc_mem_" + m_strProcName).c_str());
m_pMem[iChild]->destroy<t_sh_str>(("takin_sqw_proc_params_" + strProcName).c_str());
ipr::shared_memory_object::remove(("takin_sqw_proc_mem_" + strProcName).c_str());
tl::log_debug("Removed process memory \"", "takin_sqw_proc_*_", m_strProcName, "\" for client ", m_pidChild, ".");
tl::log_debug("Removed process memory \"", "takin_sqw_proc_*_",
strProcName, "\" for child process ", m_pidChild[iChild], ".");
}
}
}
catch(const std::exception& ex)
{
tl::log_debug("Client process ", m_pidChild, " unloading exception: ", ex.what());
tl::log_debug("Child process unloading exception: ", ex.what());
}
}
......@@ -502,19 +560,37 @@ template<class t_sqw>
std::tuple<std::vector<t_real>, std::vector<t_real>>
SqwProc<t_sqw>::disp(t_real dh, t_real dk, t_real dl) const
{
std::lock_guard<std::mutex> lock(*m_pmtx);
auto query_disp = [this](std::size_t iChild, t_real dh, t_real dk, t_real dl)
-> std::tuple<std::vector<t_real>, std::vector<t_real>>
{
ProcMsg msg;
msg.ty = ProcMsgTypes::DISP;
msg.dParam1 = dh;
msg.dParam2 = dk;
msg.dParam3 = dl;
msg_send(*m_pmsgOut[iChild], msg);
ProcMsg msg;
msg.ty = ProcMsgTypes::DISP;
msg.dParam1 = dh;
msg.dParam2 = dk;
msg.dParam3 = dl;
msg_send(*m_pmsgOut, msg);
t_sh_str *pPars = static_cast<t_sh_str*>(m_pSharedPars[iChild]);
ProcMsg msgDisp = msg_recv(*m_pmsgIn[iChild]);
return str_to_disp(*pPars);
};
t_sh_str *pPars = static_cast<t_sh_str*>(m_pSharedPars);
ProcMsg msgDisp = msg_recv(*m_pmsgIn);
return str_to_disp(*pPars);
if(!m_bOk)
return std::make_tuple(std::vector<t_real>{}, std::vector<t_real>{});
// find a free child process
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::unique_lock<std::mutex> lock(*m_pmtx[iChild], std::defer_lock);
if(lock.try_lock())
return query_disp(iChild, dh, dk, dl);
}
// if all processes are occupied, queue at the first one
std::lock_guard<std::mutex> lock(*m_pmtx[0]);
return query_disp(0, dh, dk, dl);
}
......@@ -524,33 +600,60 @@ SqwProc<t_sqw>::disp(t_real dh, t_real dk, t_real dl) const
template<class t_sqw>
t_real SqwProc<t_sqw>::operator()(t_real dh, t_real dk, t_real dl, t_real dE) const
{
std::lock_guard<std::mutex> lock(*m_pmtx);
auto query_sqw = [this](std::size_t iChild, t_real dh, t_real dk, t_real dl, t_real dE)
-> t_real
{
ProcMsg msg;
msg.ty = ProcMsgTypes::SQW;
msg.dParam1 = dh;
msg.dParam2 = dk;
msg.dParam3 = dl;
msg.dParam4 = dE;
msg_send(*m_pmsgOut[iChild], msg);
ProcMsg msg;
msg.ty = ProcMsgTypes::SQW;
msg.dParam1 = dh;
msg.dParam2 = dk;
msg.dParam3 = dl;
msg.dParam4 = dE;
msg_send(*m_pmsgOut, msg);
ProcMsg msgS = msg_recv(*m_pmsgIn);
return msgS.dRet;
ProcMsg msgS = msg_recv(*m_pmsgIn[iChild]);
return msgS.dRet;
};
if(!m_bOk)
return 0.;
// find a free child process
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::unique_lock<std::mutex> lock(*m_pmtx[iChild], std::defer_lock);
if(lock.try_lock())
return query_sqw(iChild, dh, dk, dl, dE);
}
// if all processes are occupied, queue at the first one
std::lock_guard<std::mutex> lock(*m_pmtx[0]);
return query_sqw(0, dh, dk, dl, dE);
}
template<class t_sqw>
bool SqwProc<t_sqw>::IsOk() const
{
if(!m_bOk) return false;
std::lock_guard<std::mutex> lock(*m_pmtx);
if(!m_bOk)
return false;
ProcMsg msg;
msg.ty = ProcMsgTypes::IS_OK;
msg_send(*m_pmsgOut, msg);
// check all sub-processes
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::lock_guard<std::mutex> lock(*m_pmtx[iChild]);
ProcMsg msg;
msg.ty = ProcMsgTypes::IS_OK;
msg_send(*m_pmsgOut[iChild], msg);
ProcMsg msgRet = msg_recv(*m_pmsgIn[iChild]);
if(!msgRet.bRet)
return false;
}
ProcMsg msgRet = msg_recv(*m_pmsgIn);
return msgRet.bRet;
return true;
}
......@@ -560,15 +663,19 @@ bool SqwProc<t_sqw>::IsOk() const
template<class t_sqw>
std::vector<SqwBase::t_var> SqwProc<t_sqw>::GetVars() const
{
std::lock_guard<std::mutex> lock(*m_pmtx);
if(!m_bOk)
return std::vector<SqwBase::t_var>{};
// the variables should be the same for all sub-processes => only query the first
std::lock_guard<std::mutex> lock(*m_pmtx[0]);
ProcMsg msg;
msg.ty = ProcMsgTypes::GET_VARS;
msg_send(*m_pmsgOut, msg);
msg_send(*m_pmsgOut[0], msg);
t_sh_str *pPars = static_cast<t_sh_str*>(m_pSharedPars);
t_sh_str *pPars = static_cast<t_sh_str*>(m_pSharedPars[0]);
ProcMsg msgRet = msg_recv(*m_pmsgIn);
ProcMsg msgRet = msg_recv(*m_pmsgIn[0]);
return str_to_pars(*pPars);
}
......@@ -579,19 +686,26 @@ std::vector<SqwBase::t_var> SqwProc<t_sqw>::GetVars() const
template<class t_sqw>
void SqwProc<t_sqw>::SetVars(const std::vector<SqwBase::t_var>& vecVars)
{
std::lock_guard<std::mutex> lock(*m_pmtx);
if(!m_bOk)
return;
t_sh_str *pPars = static_cast<t_sh_str*>(m_pSharedPars);
// set the same variables for all child-processes
for(unsigned int iChild=0; iChild<m_iNumChildProcesses; ++iChild)
{
std::lock_guard<std::mutex> lock(*m_pmtx[iChild]);
ProcMsg msg;
msg.ty = ProcMsgTypes::SET_VARS;