Commit a6012205 authored by eric pellegrini's avatar eric pellegrini
Browse files

test

parent df3f01fd
import abc
class _Meta(type):
'''
Metaclass that allows to use the __getitem__ method at a class level for the class that has been built.
The class that uses this metaclass must define a class attribute named _registry that will be used
by the __getitem__ method.
'''
def __getitem__(self, item):
"""
Returns a given item stored in the class registry
"""
return self._registry[item]
class ClassRegistry(abc.ABCMeta):
'''
Metaclass that registers the subclasses of bases classes.
The internal registry is defined as a nested dictionary whose keys
are the |type| class attribute of the base classes and values another dictionary
whose keys are the |type| class attribute of the subclasses and values are the corresponding
class instances.
Hence any base or child class that does not define |type| class attribute will not be resgistered.
'''
__metaclass__ = _Meta
__interfaces = []
_registry = {}
def __init__(self, name, bases, namespace):
'''
Constructor of a class metaclassed by ClassFactory
:param name: the name of the class to be built by this metaclass
:param bases: the base classes of the class to be built by this metaclass
:param namespace: the attributes and methods of the class to be built by this metaclass
'''
super(ClassRegistry, self).__init__(name, bases, namespace)
# Get the typ of the class
typ = getattr(self, 'type', None)
if typ is None:
return
metaClass = namespace.get("__metaclass__", None)
if metaClass is ClassRegistry:
ClassRegistry.__interfaces.append(self)
ClassRegistry._registry[typ] = {}
else:
for interface in ClassRegistry.__interfaces:
if issubclass(self, interface):
ClassRegistry._registry[interface.type][typ] = self
break
@classmethod
def info(cls, interface):
'''
Returns informations about the subclasses of a given base class stored in the registry.
:param cls: the ClassRegsitry instance
:param interface: the name of base class of whom information about its subclasses is requested
'''
if not cls._registry.has_key(interface):
return "The interface " + interface + " is not registered"
import inspect
import os
# Dictionnay whose keys are the package names and values and list of (job name, job path) stored in the corresponding package.
packages = {}
# Loop over the registry items.
for k, v in cls._registry[interface].items():
# Get the module corresponding to the job class.
mod = inspect.getmodule(v)
# The package hosting the module.
modPackage = mod.__package__
# The module file.
modFilename = mod.__file__
# If no package could be found, guess a name using the directory name of the module file.
if modPackage is None:
modPackage = os.path.split(os.path.dirname(modFilename))[1]
# Update the packages dictionary.
if packages.has_key(modPackage):
packages[modPackage].append([k, v.__name__])
else:
packages[modPackage] = [[k, v.__name__]]
contents = []
# Print the contents of the packages dictionary.
contents.append("="*130)
contents.append("%-50s %-40s %-s" % ("Package", "Name", "Class"))
contents.append("="*130)
for k, v in sorted(packages.items()):
for vv in sorted(v):
contents.append("%-50s %-40s %-s" % (k, vv[0], vv[1]))
contents.append('-' * 130)
contents = "\n".join(contents)
return contents
'''
MDANSE : Molecular Dynamics Analysis for Neutron Scattering Experiments
------------------------------------------------------------------------------------------
Copyright (C)
2015- Eric C. Pellegrini Institut Laue-Langevin
BP 156
6, rue Jules Horowitz
38042 Grenoble Cedex 9
France
pellegrini[at]ill.fr
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Created on Mar 23, 2015
@author: pellegrini
'''
import abc
import numpy
from MDANSE import REGISTRY
from MDANSE.Framework.Configurables.Configurable import Configurable
from MDANSE.Framework.Configurables.Configurators import ConfiguratorsDict
from MDANSE.Core.Error import Error
class InstrumentResolutionError(Error):
pass
class InstrumentResolution(Configurable):
__metaclass__ = REGISTRY
type = "instrument resolution"
def __init__(self):
Configurable.__init__(self)
self._frequencyWindow = None
self._timeWindow = None
@abc.abstractmethod
def set_kernel(self, frequencies, dt, parameters=None):
pass
@property
def frequencyWindow(self):
if self._frequencyWindow is None:
raise InstrumentResolutionError("Undefined frequency window")
return self._frequencyWindow
@property
def timeWindow(self):
if self._timeWindow is None:
raise InstrumentResolutionError("Undefined time window")
return self._timeWindow
class IdealInstrumentResolution(InstrumentResolution):
"""Defines an ideal instrument resolution with a Dirac response
"""
type = 'ideal'
configurators = ConfiguratorsDict()
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
self._frequencyWindow = numpy.zeros(len(frequencies), dtype=numpy.float64)
self._frequencyWindow[len(frequencies)/2] = 1.0
self._timeWindow = numpy.ones(len(frequencies), dtype=numpy.float64)
class GaussianInstrumentResolution(InstrumentResolution):
"""Defines an instrument resolution with a gaussian response
"""
type = 'gaussian'
configurators = ConfiguratorsDict()
configurators.add_item('mu', 'float', default=0.0)
configurators.add_item('sigma', 'float', default=1.0)
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
mu = self._configuration["mu"]["value"]
sigma = self._configuration["sigma"]["value"]
self._frequencyWindow = (1.0/(sigma*numpy.sqrt(2.0*numpy.pi)))*numpy.exp(-0.5*((frequencies-mu)/sigma)**2)
self._timeWindow = numpy.fft.fftshift(numpy.abs(numpy.fft.ifft(self._frequencyWindow))/dt)
class LorentzianInstrumentResolution(InstrumentResolution):
"""
Defines an instrument resolution with a lorentzian response
"""
type = 'lorentzian'
configurators = ConfiguratorsDict()
configurators.add_item('mu', 'float', default=0.0)
configurators.add_item('sigma', 'float', default=1.0)
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
mu = self._configuration["mu"]["value"]
sigma = self._configuration["sigma"]["value"]
fact = 0.5*sigma
self._frequencyWindow = (1.0/numpy.pi)*(fact/((frequencies-mu)**2 + fact**2))
self._timeWindow = numpy.fft.fftshift(numpy.abs(numpy.fft.ifft(self._frequencyWindow))/dt)
class PseudoVoigtInstrumentResolution(InstrumentResolution):
"""Defines an instrument resolution with a pseudo-voigt response
"""
type = 'pseudo-voigt'
configurators = ConfiguratorsDict()
configurators.add_item('eta','float', mini=0.0, maxi=1.0, default=0.5)
configurators.add_item('mu_lorentzian','float', default=0.0)
configurators.add_item('sigma_lorentzian','float', default=1.0)
configurators.add_item('mu_gaussian','float', default=0.0)
configurators.add_item('sigma_gaussian','float', default=1.0)
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
eta = self._configuration["eta"]["value"]
muL = self._configuration["mu_lorentzian"]["value"]
sigmaL = self._configuration["sigma_lorentzian"]["value"]
muG = self._configuration["mu_gaussian"]["value"]
sigmaG = self._configuration["sigma_gaussian"]["value"]
gContribution = (1.0/(sigmaG*numpy.sqrt(2.0*numpy.pi)))*numpy.exp(-0.5*((frequencies-muG)/sigmaG)**2)
fact = 0.5*sigmaL
lContribution = (1.0/numpy.pi)*(fact/((frequencies-muL)**2 + fact**2))
self._frequencyWindow = eta*lContribution + (1.0-eta)*gContribution
self._timeWindow = numpy.fft.fftshift(numpy.abs(numpy.fft.ifft(self._frequencyWindow))/dt)
class TriangularInstrumentResolution(InstrumentResolution):
"""Defines an instrument resolution with a triangular response
"""
type = 'triangular'
configurators = ConfiguratorsDict()
configurators.add_item('mu', 'float', default=0.0)
configurators.add_item('sigma', 'float', default=1.0)
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
mu = self._configuration["mu"]["value"]
sigma = self._configuration["sigma"]["value"]
val = numpy.abs(frequencies-mu) - sigma
self._frequencyWindow = numpy.where( val >= 0, 0.0, -val/sigma**2)
self._timeWindow = numpy.fft.fftshift(numpy.abs(numpy.fft.ifft(self._frequencyWindow))/dt)
class SquareInstrumentResolution(InstrumentResolution):
"""Defines an instrument resolution with a square response
"""
type = 'square'
configurators = ConfiguratorsDict()
configurators.add_item('mu', 'float', default=0.0)
configurators.add_item('sigma', 'float', default=1.0)
__doc__ += configurators.build_doc()
def set_kernel(self, frequencies, dt):
mu = self._configuration["mu"]["value"]
sigma = self._configuration["sigma"]["value"]
self._frequencyWindow = numpy.where((numpy.abs(frequencies-mu)-sigma) > 0,0.0,1.0/(2.0*sigma))
self._timeWindow = numpy.fft.fftshift(numpy.abs(numpy.fft.ifft(self._frequencyWindow))/dt)
if __name__ == "__main__":
res = REGISTRY["instrument resolution"]["square"]()
res.setup({"mu":2.0,"sigma":1.0})
res.set_kernel(numpy.array([1,2,3,4,5,6,7,8]),0.1)
import abc
import glob
import os
import random
import stat
import string
import subprocess
import sys
from MDANSE import LOGGER, PLATFORM, REGISTRY
from MDANSE.Core.Error import Error
from MDANSE.Framework.Configurables.Configurable import Configurable
from MDANSE.Framework.Configurables.Jobs.JobStatus import JobStatus
from MDANSE.Framework.IO.OutputVariables import OutputData
class JobError(Error):
pass
def key_generator(size=4, chars=None, prefix=""):
if chars is None:
chars = string.ascii_lowercase + string.digits
key = ''.join(random.choice(chars) for _ in range(size))
if prefix:
key = "%s_%s" % (prefix,key)
return key
class Job(Configurable):
"""
This class handles a nMOLDYN job. In nMOLDYN any task modeled by a loop can be
considered a nMOLDYN job.
"""
__metaclass__ = REGISTRY
type = "job"
section = "job"
@staticmethod
def set_name():
"""
Sets a name for the job that is not already in use by another running job.
"""
prefix = '%d' % PLATFORM.pid()
# The list of the registered jobs.
registeredJobs = [os.path.basename(f) for f in glob.glob(os.path.join(PLATFORM.temporary_files_directory(),'*'))]
while True:
# Followed by 4 random letters.
name = key_generator(4, prefix=prefix)
if not name in registeredJobs:
break
return name
def __init__(self, status=None):
"""
The base class constructor.
"""
Configurable.__init__(self)
self._outputData = OutputData()
self._name = Job.set_name()
self._info = ""
if status is not None:
self._status = JobStatus(self)
else:
self._status = None
def build_documentation(self):
doc = [self.__doc__]
for conf in self.configurators.values():
if conf.__doc__ is None:
doc.append("%s : %s\n" % (conf.type,'undocumented'))
else:
doc.append("%s : %s\n" % (conf.type,conf.__doc__))
doc = '\n'.join(doc)
return doc
@classmethod
def build_parallelization_test(cls, testFile, parameters=None):
"""
Produce a file like object for a given job.\n
:Parameters:
#. parameters (dict): optional. If not None, the parameters with which the job file will be built.
"""
# Try to open the output test file to see whether it is allowed.
try:
f = open(testFile, 'w')
# Case where for whatever reason, the file could not be opened for writing. Return.
except IOError as e:
raise JobError(["Error when saving python batch file.",e])
# The first line contains the call to the python executable. This is necessary for the file to
# be autostartable.
f.write('#!%s\n\n' % sys.executable)
f.write('import os\n')
f.write('import unittest\n')
f.write('import numpy\n')
f.write('from Scientific.IO.NetCDF import NetCDFFile\n')
f.write('from Tests.UnitTest import UnitTest\n')
f.write('from nMOLDYN import REGISTRY\n\n')
f.write('class Test%sParallel(UnitTest):\n\n' % cls.type.upper())
f.write(' def setUp(self):\n')
f.write(' from nMOLDYN import LOGGER\n')
f.write(' LOGGER.stop()\n\n')
f.write(' def test(self):\n')
# Writes the line that will initialize the |parameters| dictionary.
f.write(' parameters = {}\n')
for k, v in sorted(parameters.items()):
f.write(' parameters[%r] = %r\n' % (k, v))
f.write('\n job = REGISTRY[%r](%r)\n\n' % ('job',cls.type))
f.write(' parameters["running_mode"] = ("monoprocessor",1)\n')
f.write(' self.assertNotRaises(job.run,parameters,False)\n\n')
f.write(' f = NetCDFFile(job.configuration["output_files"]["files"][0],"r")\n')
f.write(' resMono = {}\n')
f.write(' for k,v in f.variables.items():\n')
f.write(' resMono[k] = v.getValue()\n')
f.write(' f.close()\n\n')
f.write(' parameters["running_mode"] = ("multiprocessor",2)\n')
f.write(' self.assertNotRaises(job.run,parameters,False)\n\n')
f.write(' f = NetCDFFile(job.configuration["output_files"]["files"][0],"r")\n')
f.write(' resMulti = {}\n')
f.write(' for k,v in f.variables.items():\n')
f.write(' resMulti[k] = v.getValue()\n')
f.write(' f.close()\n\n')
f.write(' for k in resMono.keys():\n')
f.write(' self.assertTrue(numpy.allclose(resMono[k],resMulti[k]))\n\n')
f.write('def suite():\n')
f.write(' loader = unittest.TestLoader()\n')
f.write(' s = unittest.TestSuite()\n')
f.write(' s.addTest(loader.loadTestsFromTestCase(Test%sParallel))\n' % cls.type.upper())
f.write(' return s\n\n')
f.write('if __name__ == "__main__":\n')
f.write(' unittest.main(verbosity=2)\n')
f.close()
os.chmod(testFile,stat.S_IRWXU)
@staticmethod
def set_pyro_server():
import Pyro.errors
import Pyro.naming
# Gets a Pyro proxy for the name server.
locator = Pyro.naming.NameServerLocator()
# Try to get an existing name server.
try:
ns = locator.getNS()
# Otherwise, start a new one.
except Pyro.errors.NamingError:
subprocess.Popen([sys.executable, '-O', '-c', "import Pyro.naming; Pyro.naming.main([])"], stdout = subprocess.PIPE)
ns = None
while ns is None:
try:
ns = locator.getNS()
except Pyro.errors.NamingError:
pass
@property
def name(self):
return self._name
@property
def configuration(self):
return self._configuration
@abc.abstractmethod
def finalize(self):
pass
@abc.abstractmethod
def initialize(self):
pass
@abc.abstractmethod
def run_step(self):
pass
@classmethod
def save(cls, jobFile, parameters=None):
"""
Save a job file for a given job.\n
:Parameters:
#. jobFile (str): The name of the output job file.\n
#. parameters (dict): optional. If not None, the parameters with which the job file will be built.
"""
# Try to open the output job file to see whether it is allowed.
try:
f = open(jobFile, 'w')
# Case where for whatever reason, the file could not be opened for writing. Return.
except IOError as e:
raise JobError(["Error when saving python batch file.",e])
# The first line contains the call to the python executable. This is necessary for the file to
# be autostartable.
f.write('#!%s\n\n' % sys.executable)
# Writes the input file header.
f.write('########################################################\n')
f.write('# This is an automatically generated MDANSE run script #\n')
f.write('#######################################################\n\n')
# Write the import.
f.write("from MDANSE import REGISTRY\n\n")
f.write('################################################################\n')
f.write('# Job parameters #\n')
f.write('################################################################\n\n')
# Writes the line that will initialize the |parameters| dictionary.
f.write('parameters = {}\n')
if parameters is None:
parameters = cls.configurators.get_default_parameters()
for k, v in sorted(parameters.items()):
f.write('parameters[%r] = %r\n' % (k, v))
f.write('\n')
f.write('################################################################\n')
f.write('# Setup and run the analysis #\n')
f.write('################################################################\n')
f.write('\n')
# Sets |analysis| variable to an instance analysis to save.
f.write('job = REGISTRY[%r][%r](status=False)\n' % ('job',cls.type))
f.write('job.setup(parameters)\n')
f.write('job.run()')