BuildJobTests.py 9.93 KB
Newer Older
1 2 3 4
import os
import stat
import sys

5
from MDANSE import REGISTRY
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
#from docutils.io import Output
#from docutils.nodes import reference

class JobFileGenerator():
    def __init__(self, job, parameters=None):
        """
        Builds test for a given job
        :Parameters:
        # job
        # parameters (dict): optional. If not None, the parameters which the job file will be built with.
        """
        # Save job
        self.job = job
        self.job.get_default_parameters()
        # Check if reference data is present
        self.reference_data_path = os.path.join(os.path.pardir, os.path.pardir, os.path.pardir, "Data", "Jobs_reference_data", job._type)
        self.reference_data_file = self.reference_data_path + "_reference" + ".nc"
        if not os.path.isfile(self.reference_data_file):
            print "/!\ Reference data file is not present for job " + str(job)
            self.reference_data_file = None
        # Check if job can be launched on multiprocessor
        if job.settings.has_key('running_mode'):
            self.multiprocessor = True
        else:
            print "/!\ Job " + str(job) + " cannot be launched on multiprocessor"
            self.multiprocessor = False
        # Create the job file
        self.job_file_name = "Test_%s.py" % job._type
        self.__build_job_file(parameters)

    def __build_job_file(self, parameters):
        """
        Builds job file for a given job
        :Parameters:
        # parameters (dict): optional. If not None, the parameters which the job file will be built with.
        """
        array_of_python_dependancies_string = ['unittest', 'numpy', 'os']
        array_of_mdanse_dependancies_string = ['from Tests.UnitTests.UnitTest import UnitTest',
                                               'from MDANSE import REGISTRY']
        test_string = ''
        test_string = test_string +     'class Test%s(UnitTest):\n\n' % self.job._type.upper()
        test_string = test_string +     '    def test(self):\n'
        # Writes the line that will initialize the |parameters| dictionary and create the job
        if parameters is None:
            parameters = self.job.get_default_parameters()
        test_string = test_string +     '        parameters = {}\n'
        for k, v in sorted(parameters.items()):
            test_string = test_string + '        parameters[%r] = %r\n' % (k, v)
        test_string = test_string +     '        job = REGISTRY[%r][%r]()\n' % ('job',self.job._type)
        test_string = test_string +     '        if "output_file" in parameters:\n'
        test_string = test_string +     '            output_path = parameters["output_file"][0]\n'
        test_string = test_string +     '        else:\n'
        test_string = test_string +     '            output_path = parameters["output_files"][0]\n'
        test_string = test_string +     '        reference_data_path = "' + self.reference_data_path + '"\n'
        # Launch the job in monoprocessor mode and copy output file
        test_string = test_string +     '        print "Launching job in monoprocessor mode"\n'
        test_string = test_string +     '        parameters["running_mode"] = ("monoprocessor",1)\n'
        test_string = test_string +     '        self.assertNotRaises(job.run, parameters, status=False)\n'
        test_string = test_string +     '        shutil.copy(output_path + ".nc", reference_data_path + "_mono" + ".nc")\n'
        test_string = test_string +     '        print "Monoprocessor execution completed"\n\n'
        # Launch the job in multiprocessor mode if avalaible
        if self.multiprocessor:
            test_string = test_string + '        print "Launching job in multiprocessor mode"\n'
            test_string = test_string + '        parameters["running_mode"] = ("multiprocessor",2)\n'
            test_string = test_string + '        self.assertNotRaises(job.run,parameters,False)\n'
            test_string = test_string +     '        shutil.copy(output_path + ".nc", reference_data_path + "_multi" + ".nc")\n'
            test_string = test_string + '        print "Multiprocessor execution completed"\n\n'
        # Compare reference data with monoprocessor if reference data exists
        if self.reference_data_file:
            test_string = test_string + '        print "Comparing monoprocessor output with reference output"\n'
            test_string = test_string + '        self.assertTrue(compare("' +  self.reference_data_file + '", reference_data_path + "_mono" + ".nc"))\n\n'
        # Compare reference data with multiprocessor if reference data exists
        if self.reference_data_file and self.multiprocessor:
            test_string = test_string + '        print "Comparing multiprocessor output with reference output"\n'
            test_string = test_string + '        self.assertTrue(compare("' +  self.reference_data_file + '", reference_data_path + "_multi" + ".nc"))\n\n'
        # If no reference data but multiprocessor, compare mono and multiprocessor
        elif self.multiprocessor:
            test_string = test_string + '        print "Comparing monoprocessor output with multiprocessor output"\n'
            test_string = test_string + '        self.assertTrue(compare(reference_data_path + "_mono" + ".nc", reference_data_path + "_multi" + ".nc"))\n\n'
        test_string = test_string +     '        try:\n'
        test_string = test_string +     '            os.remove(reference_data_path + "_mono" + ".nc")\n'
        if self.multiprocessor:
            test_string = test_string + '            os.remove(reference_data_path + "_multi" + ".nc")\n'
        test_string = test_string +     '        except:\n'
        test_string = test_string +     '            pass\n'
        # If test is GMTF, restore old_universe_name
        if self.job._type == "gmft":
            test_string = test_string + '        job.configuration["trajectory"]["instance"].universe.__class__.__name__ = job.old_universe_name\n'
        # Finally write the suite method that will be called by test script
        test_string = test_string +     '\n\ndef suite():\n'
        test_string = test_string +     '    loader = unittest.TestLoader()\n'
        test_string = test_string +     '    s = unittest.TestSuite()\n'
        test_string = test_string +     '    s.addTest(loader.loadTestsFromTestCase(Test%s))\n' % self.job._type.upper()
        test_string = test_string +     '    return s'
        
        self.__generate_test_file(test_string, array_of_python_dependancies_string, array_of_mdanse_dependancies_string)
102

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    def __generate_test_file(self, test_string, array_of_python_dependancies_string, array_of_mdanse_dependancies_string):
        """
        Produce a file for the given informations
        :Parameters:
        # test_string (string): the content
        # array_of_python_dependancies_string (array of string) Example:[numpy, "os", "sys"]
        # array_of_mdanse_dependancies_string (array of string) Example:["from foo import bar", "import spam"]
        """
        f = open(self.job_file_name, 'w')
        
        # The first line contains the call to the python executable. This is necessary for the file to
        # be autostartable.
        f.write('#!%s\n\n' % sys.executable)
        
        # Write dependancies
        # Add unittest, shutils and os if needed
        if not "unittest" in array_of_python_dependancies_string:
            array_of_python_dependancies_string.append("unittest")
        if not "os" in array_of_python_dependancies_string:
            array_of_python_dependancies_string.append("os")
        if not "shutil" in array_of_python_dependancies_string:
            array_of_python_dependancies_string.append("shutil")
        if not "time" in array_of_python_dependancies_string:
            array_of_python_dependancies_string.append("time")
        # Add NetCDF
        array_of_mdanse_dependancies_string.append('from Scientific.IO.NetCDF import NetCDFFile')
        array_of_mdanse_dependancies_string.append('import Comparator')
eric pellegrini's avatar
eric pellegrini committed
130
    
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
        # Sort arrays to write imports in the alphabetical order
        array_of_python_dependancies_string.sort()
        array_of_mdanse_dependancies_string.sort()
        # Write in file
        for dependancy_string in array_of_python_dependancies_string:
            f.write("import " + dependancy_string + "\n")
        f.write("\n")
        for dependancy_string in array_of_mdanse_dependancies_string:
            f.write(dependancy_string + "\n")
        f.write("\n")
        
        # Create the compare function
        test_string2 =                'def compare(file1, file2):\n'
        test_string2 = test_string2 + '    ret = True\n'
        test_string2 = test_string2 + '    f = NetCDFFile(file1,"r")\n'
        test_string2 = test_string2 + '    res1 = {}\n'
        test_string2 = test_string2 + '    for k,v in f.variables.items():\n'
        test_string2 = test_string2 + '        res1[k] = v.getValue()\n'
        test_string2 = test_string2 + '    f.close()\n'
        test_string2 = test_string2 + '    f = NetCDFFile(file2,"r")\n'
        test_string2 = test_string2 + '    res2 = {}\n'
        test_string2 = test_string2 + '    for k,v in f.variables.items():\n'
        test_string2 = test_string2 + '        res2[k] = v.getValue()\n'
        test_string2 = test_string2 + '    f.close()\n'
        test_string2 = test_string2 + '    return Comparator.Comparator().compare(res1, res2)\n\n'
        
        # Write test string
        f.write(test_string2 + test_string)
        f.write("\n\n")
        
        # Write file ending
        f.write("if __name__ == '__main__':\n")
        f.write('    unittest.main(verbosity=2)\n')
        f.close()
        os.chmod(self.job_file_name,stat.S_IRWXU)
        
if __name__ == '__main__':
    # Main script, automatically creates source files for testing jobs
    for job_id,job in REGISTRY['job'].items():
        # Skip the mcstas test because mcstas executable is not available on all platform
        if job_id=='mvi':
            pass
        else:
            job_file_generator = JobFileGenerator(job)