IJob.py 13.6 KB
Newer Older
1 2 3 4 5
#MDANSE : Molecular Dynamics Analysis for Neutron Scattering Experiments
#------------------------------------------------------------------------------------------
#Copyright (C)
#2015- Eric C. Pellegrini Institut Laue-Langevin
#BP 156
6 7
#71 avenue des Martyrs
#38000 Grenoble Cedex 9
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#France
#pellegrini[at]ill.fr
#goret[at]ill.fr
#aoun[at]ill.fr
#
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU Lesser General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#Lesser General Public License for more details.
#
#You should have received a copy of the GNU Lesser General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

''' 
Created on Mar 30, 2015

30
:author: Eric C. Pellegrini
31 32
'''

eric pellegrini's avatar
test  
eric pellegrini committed
33 34 35 36 37 38 39 40
import abc
import glob
import os
import random
import stat
import string
import subprocess
import sys
41
import traceback
eric pellegrini's avatar
test  
eric pellegrini committed
42

43
from MDANSE import PLATFORM, REGISTRY
eric pellegrini's avatar
test  
eric pellegrini committed
44
from MDANSE.Core.Error import Error
45
from MDANSE.Framework.Configurable import Configurable
46 47
from MDANSE.Framework.Jobs.JobStatus import JobStatus
from MDANSE.Framework.OutputVariables.IOutputVariable import OutputData
eric pellegrini's avatar
test  
eric pellegrini committed
48 49

class JobError(Error):
eric pellegrini's avatar
eric pellegrini committed
50 51 52 53 54 55 56 57 58 59 60 61
    '''
    This class handles any exception related to IJob-derived objects
    '''
    
    def __init__(self,job,message=None):
        '''
        Initializes the the object.
        
        :param job: the configurator in which the exception was raised
        :type job: IJob derived object
        '''

62 63 64
        trace = []                        

        tback = traceback.extract_stack()
65
        
66 67 68
        for tb in tback:
            trace.append(' -- '.join([str(t) for t in tb]))

eric pellegrini's avatar
eric pellegrini committed
69 70
        if message is None:
            message = sys.exc_info()[1]
71

eric pellegrini's avatar
eric pellegrini committed
72
        self._message = str(message)
73 74 75 76

        trace.append("\n%s" % self._message)

        trace = '\n'.join(trace)
77
                
eric pellegrini's avatar
eric pellegrini committed
78
        if job._status is not None:
79
            job._status._state["state"] = "aborted"
80
            job._status._state['traceback'] = trace
81
            job._status._state['info'] = str(job)
eric pellegrini's avatar
eric pellegrini committed
82 83 84 85 86
            job._status.update(force=True)
            
    def __str__(self):
        
        return self._message
eric pellegrini's avatar
test  
eric pellegrini committed
87

88
def key_generator(keySize, chars=None, prefix=""):
eric pellegrini's avatar
test  
eric pellegrini committed
89 90 91 92
    
    if chars is None:
        chars = string.ascii_lowercase + string.digits
    
93
    key = ''.join(random.choice(chars) for _ in range(keySize))
eric pellegrini's avatar
test  
eric pellegrini committed
94 95 96 97 98
    if prefix:
        key = "%s_%s" % (prefix,key)
    
    return key
        
99
class IJob(Configurable):
eric pellegrini's avatar
test  
eric pellegrini committed
100
    """
101
    This class handles a MDANSE job. In MDANSE any task modeled by a loop can be considered as a MDANSE job. 
eric pellegrini's avatar
test  
eric pellegrini committed
102
    """
103 104
                
    _registry = "job"
eric pellegrini's avatar
test  
eric pellegrini committed
105 106
    
    section = "job"
eric pellegrini's avatar
eric pellegrini committed
107
    
108
    ancestor = []
eric pellegrini's avatar
test  
eric pellegrini committed
109 110
        
    @staticmethod
111
    def define_unique_name():
eric pellegrini's avatar
test  
eric pellegrini committed
112 113 114 115
        """
        Sets a name for the job that is not already in use by another running job.
        """
        
116
        prefix = '%s_%d' % (PLATFORM.username()[:4],PLATFORM.pid())
eric pellegrini's avatar
test  
eric pellegrini committed
117 118 119 120 121 122 123
    
        # The list of the registered jobs.
        registeredJobs = [os.path.basename(f) for f in glob.glob(os.path.join(PLATFORM.temporary_files_directory(),'*'))]
        
        while True:     
    
            # Followed by 4 random letters.
124
            name = key_generator(6, prefix=prefix)
eric pellegrini's avatar
test  
eric pellegrini committed
125 126 127 128 129 130
            
            if not name in registeredJobs:
                break
                        
        return name
        
131
    def __init__(self):
eric pellegrini's avatar
test  
eric pellegrini committed
132 133 134 135 136
        """
        The base class constructor.        
        """

        Configurable.__init__(self)
137 138

        self._outputData = OutputData()
eric pellegrini's avatar
test  
eric pellegrini committed
139
        
140 141
        self._status = None
                                            
142
  
eric pellegrini's avatar
test  
eric pellegrini committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    @staticmethod
    def set_pyro_server():
    
        import Pyro.errors
        import Pyro.naming
    
        # Gets a Pyro proxy for the name server.
        locator = Pyro.naming.NameServerLocator()
    
        # Try to get an existing name server.        
        try:
            ns = locator.getNS()
            
        # Otherwise, start a new one.        
        except Pyro.errors.NamingError:
            
            subprocess.Popen([sys.executable, '-O', '-c', "import Pyro.naming; Pyro.naming.main([])"], stdout = subprocess.PIPE)
            ns = None
            while ns is None:
                try:
                    ns = locator.getNS()
                except Pyro.errors.NamingError:
                    pass
                            
    @property
    def name(self):
        return self._name

    @property
    def configuration(self):
        return self._configuration

    @abc.abstractmethod
    def finalize(self):        
        pass
        
    @abc.abstractmethod
    def initialize(self):        
        pass

    @abc.abstractmethod
    def run_step(self):
        pass
            
    @classmethod
    def save(cls, jobFile, parameters=None):
        """
        Save a job file for a given job.\n
        :Parameters:
            #. jobFile (str): The name of the output job file.\n
            #. parameters (dict): optional. If not None, the parameters with which the job file will be built.
        """
195
        
196
        
eric pellegrini's avatar
eric pellegrini committed
197 198
        f = open(jobFile, 'w')
                   
eric pellegrini's avatar
test  
eric pellegrini committed
199 200 201 202 203 204 205
        # The first line contains the call to the python executable. This is necessary for the file to
        # be autostartable.
        f.write('#!%s\n\n' % sys.executable)
        
        # Writes the input file header.
        f.write('########################################################\n')
        f.write('# This is an automatically generated MDANSE run script #\n')
206
        f.write('########################################################\n\n')
eric pellegrini's avatar
test  
eric pellegrini committed
207 208 209 210 211 212 213 214 215 216 217 218
                                    
        # Write the import.
        f.write("from MDANSE import REGISTRY\n\n")
                        
        f.write('################################################################\n')
        f.write('# Job parameters                                               #\n')
        f.write('################################################################\n\n')

        # Writes the line that will initialize the |parameters| dictionary.
        f.write('parameters = {}\n')
        
        if parameters is None:
219
            parameters = cls.get_default_parameters()
eric pellegrini's avatar
test  
eric pellegrini committed
220 221 222 223 224 225 226 227 228 229
        
        for k, v in sorted(parameters.items()):
            f.write('parameters[%r] = %r\n' % (k, v))

        f.write('\n')
        f.write('################################################################\n')
        f.write('# Setup and run the analysis                                   #\n')
        f.write('################################################################\n')
        f.write('\n')
    
230 231
        f.write('%s = REGISTRY[%r][%r]()\n' % (cls._type,'job',cls._type))
        f.write('%s.run(parameters,status=True)' % (cls._type))
eric pellegrini's avatar
test  
eric pellegrini committed
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
         
        f.close()
        
        os.chmod(jobFile,stat.S_IRWXU)
        
    def _run_monoprocessor(self):

        for index in range(self.numberOfSteps):
            idx, x = self.run_step(index)                            
            self.combine(idx, x)
            
            if self._status is not None:
                if self._status.is_stopped():
                    self._status.cleanup()
                    return
                else:
                    self._status.update()
        
    def _run_multiprocessor(self):

        import MDANSE.DistributedComputing.MasterSlave as MasterSlave

        script = os.path.abspath(os.path.join(PLATFORM.package_directory(),'DistributedComputing','Slave.py'))
                
        master = MasterSlave.initializeMasterProcess(self._name, slave_script=script, use_name_server=False)

        master.setGlobalState(job=self)
        master.launchSlaveJobs(n=self.configuration['running_mode']['slots'],port=master.pyro_daemon.port)

        for index in range(self.numberOfSteps):
            master.requestTask('run_step',MasterSlave.GlobalStateValue(1,'job'),index)
        
        for index in range(self.numberOfSteps):
            _, _, (idx, x) = master.retrieveResult('run_step')
            self.combine(idx, x)

            if self._status is not None:
                if self._status.is_stopped():
                    self._status.cleanup()
271 272
                    # Break to ensure the master will be shutdowned
                    break
eric pellegrini's avatar
test  
eric pellegrini committed
273 274 275 276 277 278 279
                else:
                    self._status.update()
            
        master.shutdown()
        
    def _run_remote(self):

280
        IJob.set_pyro_server()
eric pellegrini's avatar
test  
eric pellegrini committed
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306

        import MDANSE.DistributedComputing.MasterSlave as MasterSlave

        tasks = MasterSlave.initializeMasterProcess(self._name, slave_module='MDANSE.DistributedComputing.Slave')
             
        tasks.setGlobalState(job=self)

        if self._status is not None:
            self._status.start(self.numberOfSteps,rate=0.1)
                
        for  index in range(self.numberOfSteps):
            tasks.requestTask('run_step',MasterSlave.GlobalStateValue(1,'job'),index)

        for index in range(self.numberOfSteps):
            _, _, (idx, x) = tasks.retrieveResult("run_step")
            self.combine(idx, x)

        if self._status is not None:
            if self._status.is_stopped():
                self._status.cleanup()
                return
            else:
                self._status.update()
            
    _runner = {"monoprocessor" : _run_monoprocessor, "multiprocessor" : _run_multiprocessor, "remote" : _run_remote}

307
    def run(self,parameters,status=False):
eric pellegrini's avatar
test  
eric pellegrini committed
308 309 310 311 312
        """
        Run the job.
        """
        
        try:
313
            
314
            self._name = "%s_%s" % (self._type,IJob.define_unique_name())
315 316 317 318
                                                        
            if status:
                self._status = JobStatus(self)
            
eric pellegrini's avatar
eric pellegrini committed
319
            self.setup(parameters)
320
                                    
eric pellegrini's avatar
eric pellegrini committed
321
            self.initialize()
322
            
323 324
            if self._status is not None:
                self._status.start(self.numberOfSteps,rate=0.1)
325
                self._status.state['info'] = str(self)
326
                                        
eric pellegrini's avatar
eric pellegrini committed
327 328 329
            if getattr(self,'numberOfSteps', 0) <= 0:
                raise JobError(self,"Invalid number of steps for job %s" % self._name)
    
330 331 332 333
            if self.configuration.has_key('running_mode'):
                mode = self.configuration['running_mode']['mode']
            else:
                mode = 'monoprocessor'
eric pellegrini's avatar
eric pellegrini committed
334
    
335
            IJob._runner[mode](self)
eric pellegrini's avatar
eric pellegrini committed
336 337 338 339 340 341
    
            self.finalize()
    
            if self._status is not None:
                self._status.finish()
        except:
342 343
            tb = traceback.format_exc()
            raise JobError(self,tb)
eric pellegrini's avatar
test  
eric pellegrini committed
344 345 346 347

    @property
    def info(self):
            
348
        return self._info
349 350

    @classmethod
351 352
    def save_template(cls, shortname,classname):
                    
353
        if REGISTRY['job'].has_key(shortname):
354
            raise KeyError('A job with %r name is already stored in the registry' % shortname)
355 356
                                
        templateFile = os.path.join(PLATFORM.macros_directory(),"%s.py" % classname)
357
                
eric pellegrini's avatar
eric pellegrini committed
358 359
        try:            
            f = open(templateFile,'w')
360
        
eric pellegrini's avatar
eric pellegrini committed
361
            f.write(
362 363
'''import collections

364 365
from MDANSE import REGISTRY

366 367
from MDANSE.Framework.Jobs.IJob import IJob

368
class %(classname)s(IJob):
369 370 371
    """
    You should enter the description of your job here ...
    """
372 373 374
        
    # You should enter the label under which your job will be viewed from the gui.
    label = %(label)r
375 376 377 378

    # You should enter the category under which your job will be references.
    category = ('My jobs',)
    
379
    ancestor = ["mmtk_trajectory"]
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397

    # You should enter the configuration of your job here
    # Here a basic example of a job that will use a MMTK trajectory, a frame selection and an output file in NetCDF and ASCII file formats
    settings = collections.OrderedDict()
    settings['trajectory']=('mmtk_trajectory',{})
    settings['frames']=('frames', {"dependencies":{'trajectory':'trajectory'}})
    settings['output_files']=('output_files', {"formats":["netcdf","ascii"]})
            
    def initialize(self):
        """
        Initialize the input parameters and analysis self variables
        """

        # Compulsory. You must enter the number of steps of your job.
        # Here for example the number of selected frames
        self.numberOfSteps = self.configuration['frames']['number']
                        
        # Create an output data for the selected frames.
398
        self._outputData.add("time", "line", self.configuration['frames']['time'], units='ps')
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423


    def run_step(self, index):
        """
        Runs a single step of the job.
        """
                                
        return index, None
    
    
    def combine(self, index, x):
        """
        Synchronize the output of each individual run_step output.
        """     
                    
    def finalize(self):
        """
        Finalizes the job (e.g. averaging the total term, output files creations ...).
        """ 

        # The output data are written
        self._outputData.write(self.configuration['output_files']['root'], self.configuration['output_files']['formats'], self._info)
        
        # The trajectory is closed
        self.configuration['trajectory']['instance'].close()        
424 425 426

REGISTRY[%(shortname)r] = %(classname)s
''' % {'classname':classname,'label':'label of the class','shortname':shortname})
427
        
eric pellegrini's avatar
eric pellegrini committed
428
        except IOError:
429
            return None
eric pellegrini's avatar
eric pellegrini committed
430
        else:
431 432
            f.close()        
            return templateFile