Commit 7bbc022d authored by eric pellegrini's avatar eric pellegrini
Browse files

Fully docstringed the MDANSE.DistributedComputing.MasterSlave module

parent c1c1627f
#MDANSE : Molecular Dynamics Analysis for Neutron Scattering Experiments
#------------------------------------------------------------------------------------------
#Copyright (C)
#2015- Eric C. Pellegrini Institut Laue-Langevin
#BP 156
#6, rue Jules Horowitz
#38042 Grenoble Cedex 9
#France
#pellegrini[at]ill.fr
#goret[at]ill.fr
#aoun[at]ill.fr
#
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU Lesser General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
#Lesser General Public License for more details.
#
#You should have received a copy of the GNU Lesser General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
'''
Distributed computing using a master-slave model
......@@ -42,8 +68,7 @@ Examples/slave.py show a master-slave setup using two distinct scripts.
This is more flexible because task requests and result retrievals
can be made from anywhere in the master code.
@author: Konrad Hinsen
:author: Konrad Hinsen
'''
import copy
......@@ -54,7 +79,7 @@ import Pyro.core
import Pyro.errors
import Pyro.naming
from Scientific.DistributedComputing.TaskManager import TaskManager, TaskManagerTermination, TaskRaisedException
from Scientific.DistributedComputing.TaskManager import TaskManager, TaskManagerTermination
debug = False
......@@ -74,15 +99,15 @@ class MasterProcess(object):
def __init__(self, label, use_name_server=True, maxTrials=None):
"""
@param label: the label that identifies the task manager
@type label: C{str}
:param label: the label that identifies the task manager
:type label: C{str}
@param use_name_server: If C{True} (default), the task manager is
:param use_name_server: If C{True} (default), the task manager is
registered with the Pyro name server. If
C{False}, the name server is not used and
slave processes need to know the host
on which the master process is running.
@type use_name_server: C{bool}
:type use_name_server: C{bool}
"""
self.label = label
self.task_manager = TaskManager()
......@@ -98,7 +123,7 @@ class MasterProcess(object):
nTrials = 0
while 1:
try:
port = self.pyro_daemon.port
self.pyro_daemon.port
except AttributeError:
nTrials += 1
if debug:
......@@ -124,8 +149,7 @@ class MasterProcess(object):
pyro_ns.createGroup("TaskManager")
except Pyro.errors.NamingError:
pass
uri = self.pyro_daemon.connect(self.task_manager,
"TaskManager.%s" % self.label)
self.pyro_daemon.connect(self.task_manager,"TaskManager.%s" % self.label)
try:
self.pyro_daemon.requestLoop()
finally:
......@@ -143,33 +167,33 @@ class MasterProcess(object):
parameters given in the task request. Note that the order of
task executions is not defined.
@param tag: a tag identifying the computational task. It corresponds
:param tag: a tag identifying the computational task. It corresponds
to the name of a method in the slave process.
@type tag: C{str}
:type tag: C{str}
@param parameters: the parameters passed to the corresponding method
:param parameters: the parameters passed to the corresponding method
in the slave process. The only restriction on their
types is that all parameters must be picklable.
@return: a unique task id
@rtype: C{str}
:return: a unique task id
:rtype: C{str}
"""
return self.task_manager.addTaskRequest(tag, parameters)
def retrieveResult(self, tag=None):
"""
@param tag: a tag identifying the computational task from which a
:param tag: a tag identifying the computational task from which a
return value is requested. If C{None}, results from
any task will be accepted.
@type tag: C{str}
:type tag: C{str}
@return: a tuple containing three values: the task id to which the
:return: a tuple containing three values: the task id to which the
result corresponds, the tag of the computational task,
and the result returned by the slave method that handled
the task
@rtype: C{tuple}
:rtype: C{tuple}
@raises TaskRaisedException: if the slave method raised an exception
:raise exception: raises TaskRaisedException: if the slave method raised an exception
"""
try:
if tag is None:
......@@ -251,7 +275,7 @@ sys.modules["__main__"].SLAVE_NAMESPACE = namespace
exec slave_code % port in namespace
'''
directory = self.task_manager.retrieveData("cwd")
for i in range(n):
for _ in range(n):
process = subprocess.Popen([sys.executable],
stdin=subprocess.PIPE,
cwd=directory)
......@@ -272,24 +296,24 @@ class SlaveProcess(object):
def __init__(self, label, master_host=None, watchdog_period=120.):
"""
@param label: the label that identifies the task manager
@type label: C{str}
:param label: the label that identifies the task manager
:type label: C{str}
@param master_host: If C{None} (default), the task manager of the
:param master_host: If C{None} (default), the task manager of the
master process is located using the Pyro name
server. If no name server is used, this parameter
must be the hostname of the machine on which the
master process runs, plus the port number if it
is different from the default (7766).
@type master_host: C{str} or C{NoneType}
:type master_host: C{str} or C{NoneType}
@param watchdog_period: the interval (in seconds) at which the
:param watchdog_period: the interval (in seconds) at which the
slave process sends messages to the
manager to signal that it is still alive.
If None, no messages are sent at all. In that
case, the manager cannot recognize if the slave
job has crashed or been killed.
@type watchdog_period: C{int} or C{NoneType}
:type watchdog_period: C{int} or C{NoneType}
"""
Pyro.core.initClient(banner=False)
if master_host is None:
......@@ -397,10 +421,10 @@ class SlaveProcess(object):
self.task_manager.unregisterProcess(self.process_id)
raise
except Exception, e:
import traceback, StringIO
if debug:
print "Exception:"
traceback.print_exc()
import traceback, StringIO
tb_text = StringIO.StringIO()
traceback.print_exc(None, tb_text)
tb_text = tb_text.getvalue()
......@@ -421,7 +445,7 @@ class SlaveProcess(object):
def getMachineInfo():
import os
import platform
sysname, nodename, release, version, machine, processor = platform.uname()
_, nodename, _, _, machine, _ = platform.uname()
pid = os.getpid()
return "PID %d on %s (%s)" % (pid, nodename, machine)
......@@ -442,27 +466,27 @@ def runJob(label, master_class, slave_class, watchdog_period=120.,
TaskManager object to enable the task_manager script to launch
slave processes.
@param label: the label that identifies the task manager
@type label: C{str}
:param label: the label that identifies the task manager
:type label: C{str}
@param master_class: the class implementing the master process
:param master_class: the class implementing the master process
(a subclass of L{MasterProcess})
@param slave_class: the class implementing the slave process
:param slave_class: the class implementing the slave process
(a subclass of L{SlaveProcess})
@param watchdog_period: the interval (in seconds) at which the
:param watchdog_period: the interval (in seconds) at which the
slave process sends messages to the
manager to signal that it is still alive.
If None, no messages are sent at all. In that
case, the manager cannot recognize if the slave
job has crashed or been killed.
@type watchdog_period: C{int} or C{NoneType}
:type watchdog_period: C{int} or C{NoneType}
@param launch_slaves: the number of slaves jobs to launch
:param launch_slaves: the number of slaves jobs to launch
immediately on the same machine that runs
the master process
@type launch_slaves: C{int}
:type launch_slaves: C{int}
"""
import inspect
import os
......@@ -493,27 +517,27 @@ def initializeMasterProcess(label, slave_script=None, slave_module=None,
"""
Initializes a master process.
@param label: the label that identifies the task manager
@type label: C{str}
:param label: the label that identifies the task manager
:type label: C{str}
@param slave_script: the file name of the script that defines
:param slave_script: the file name of the script that defines
the corresponding slave process
@type slave_script: C{str}
:type slave_script: C{str}
@param slave_module: the name of the module that defines
:param slave_module: the name of the module that defines
the corresponding slave process
@type slave_module: C{str}
:type slave_module: C{str}
@param use_name_server: If C{True} (default), the task manager is
:param use_name_server: If C{True} (default), the task manager is
registered with the Pyro name server. If
C{False}, the name server is not used and
slave processes need to know the host
on which the master process is running.
@type use_name_server: C{bool}
:type use_name_server: C{bool}
@returns: a process object on which the methods requestTask()
:returns: a process object on which the methods requestTask()
and retrieveResult() can be called.
@rtype: L{MasterProcess}
:rtype: L{MasterProcess}
"""
import atexit
import os
......@@ -548,18 +572,18 @@ def startSlaveProcess(label=None, master_host=None):
Starts a slave process. Must be called at the end of a script
that defines or imports all task handlers.
@param label: the label that identifies the task manager. May be
:param label: the label that identifies the task manager. May be
omitted if the slave process is started through
the task_manager script.
@type label: C{str} or C{NoneType}
:type label: C{str} or C{NoneType}
@param master_host: If C{None} (default), the task manager of the
:param master_host: If C{None} (default), the task manager of the
master process is located using the Pyro name
server. If no name server is used, this parameter
must be the hostname of the machine on which the
master process runs, plus the port number if it
is different from the default (7766).
@type master_host: C{str} or C{NoneType}
:type master_host: C{str} or C{NoneType}
"""
import sys
main_module = sys.modules['__main__']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment