JobControllerPanel.py 15.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
#MDANSE : Molecular Dynamics Analysis for Neutron Scattering Experiments
#------------------------------------------------------------------------------------------
#Copyright (C)
#2015- Eric C. Pellegrini Institut Laue-Langevin
#BP 156
#6, rue Jules Horowitz
#38042 Grenoble Cedex 9
#France
#pellegrini[at]ill.fr
#goret[at]ill.fr
#aoun[at]ill.fr
#
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU Lesser General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#Lesser General Public License for more details.
#
#You should have received a copy of the GNU Lesser General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

''' 
Created on Apr 10, 2015

:author: Gael Goret and Eric C. Pellegrini
'''

import collections
import cPickle
import datetime
import glob
import os
import threading
import time

import wx
import wx.lib.intctrl as intctrl

from MDANSE.Core.Platform import PLATFORM
from MDANSE.Core.Singleton import Singleton
from MDANSE.Framework.Status import convert_duration, total_seconds
47 48

from MDANSE.GUI import PUBLISHER
49
from MDANSE.GUI.Icons import ICONS
50
from MDANSE.GUI.Events.JobControllerEvent import EVT_JOB_CONTROLLER, JobControllerEvent
51 52
        
class JobController(threading.Thread):
53 54 55 56 57 58 59 60 61 62 63 64
    '''
    This class sets up the job controlling daemon that will regurlarly check for the status of running MDANSE jobs.
    
    The job controller run on a separate thread in order to non-block the main thread.
    
    When the thread is running, it will check every 10s the status of the MDANSE running jobs by looking at their 
    corresponding temporary pickle files. It unpickles them and updates the :py:attr:`_runningJobs` dictionnary used to store
    the information about each running jobs (state, progress, eta, name ...). That dictionnary is passed to the job 
    controller panel widget using wx.PostEvent mechanism. In doing the job controller panel widget just has to bind 
    to EVT_JOB_CONTROLLER and it will receive the updated version of :py:attr:`_runningJobs` dictionnary every time 
    the :py:meth:run is called.     
    '''
65 66 67 68
    
    __metaclass__ = Singleton
    
    def __init__(self, window, start=False):
69 70 71 72 73 74 75
        '''
        
        :param window: the wx object that will be notify when job controller is updated
        :type window: wx object
        :param start: if True the job controller thread is started at __init__ stage.
        :type start: bool
        '''
76 77
        
        threading.Thread.__init__(self)
78 79

        self.setDaemon(True)
80
        
81
        # The wx object related to this thread
82 83 84 85
        self._window = window
        
        self._stop = threading.Event()
                                
86 87 88 89
        self._runningJobs = collections.OrderedDict()
                
        # This variable is used to keep track whether or not it is the first run of the thread.
        self._firstThreadRun = True
90
        
91 92 93 94
        if start:
            self.start()
        
    @property
95
    def runningJobs(self):
96
        
97
        return self._runningJobs
98

99
    def kill_job(self, info):
100
                
101
        if self._runningJobs.has_key(info["name"]):
102

103
            if info['state'] == 'running':
104
                try:
105
                    PLATFORM.kill_process(info['pid'])
106 107 108
                except:
                    pass

109
            del self._runningJobs[info["name"]]
110 111

        if os.path.exists(info['temporary_file']):            
112
            try:
113
                os.unlink(info['temporary_file'])
114 115
            except:
                pass
116

117
        self.update()
118
        
119 120 121 122
    def run(self):
        
        while not self._stop.is_set():
            self.update()
123
            self._stop.wait(10.0)
124 125 126 127 128

    def stop(self):
        
        self._stop.set()
        
129
    def update(self, init=False):
eric pellegrini's avatar
eric pellegrini committed
130
                
131 132 133 134
        pids = PLATFORM.get_processes_info()
                        
        # The list of the registered jobs.
        jobs = [f for f in glob.glob(os.path.join(PLATFORM.temporary_files_directory(),'*'))]
135
                        
136
        # Loop over the job registered at the previous controller check point     
137
        for job in self._runningJobs.keys():            
138 139

            # Case where a job has finished during two controller check points (i.e. its temporary file has been deleted)
140
            if self._runningJobs[job]['state'] == 'finished':
141 142
                continue
            
143 144
            # Case where the jobs has finished properly (e.g. its temporary file has been removed)
            if not job in jobs:
145 146 147 148 149
                self._runningJobs[job]['eta'] = 'N/A'
                self._runningJobs[job]['progress'] = 100
                self._runningJobs[job]['state'] = 'finished'
                start = datetime.datetime.strptime(self._runningJobs[job]["start"],"%d-%m-%Y %H:%M:%S")
                self._runningJobs[job]['elapsed'] = '%02d:%02dh:%02dm:%02ds' % convert_duration(total_seconds(datetime.datetime.today() - start))
150
                
151
        # Loop over the job whose temporary files are still present
152
        for job in jobs:
153 154 155

            # Open the job temporary file
            try:
156
                f = open(job, 'rb')
157 158 159 160 161 162 163 164 165 166 167
                info = cPickle.load(f)
                f.close()
                
            # If the file could not be opened/unpickled for whatever reason, try at the next checkpoint
            except:
                continue

            # The job file could be opened and unpickled properly
            else:
                
                name = info['name']
168
                                
169 170
                # Check that the pid of the running job corresponds to an active pid.
                running = (info['pid'] in pids)
171
                
172 173 174 175 176 177 178
                # If so, check that the corresponding pid actually corresponds to the job by comparing 
                # the job starting date and the pid creation date.
                if running:
                    jobStartingTime = datetime.datetime.strptime(info["start"],"%d-%m-%Y %H:%M:%S")
                    procStartingTime = datetime.datetime.strptime(pids[info['pid']],"%d-%m-%Y %H:%M:%S")
                    running = (jobStartingTime >= procStartingTime)
                    
179 180 181
                if not running:
                    info["state"] = "aborted"
                    
182
                # If the job was aborted, display the traceback on the dialog logger and remove the corresponding job temporary file
183
                if self._firstThreadRun and info['state'] == 'aborted':
184
                    self.kill_job(info)
185 186
                    continue

187
                self._runningJobs[name] = info
188
                
189
        wx.PostEvent(self._window, JobControllerEvent(self._runningJobs))
190
        
191
        self._firstThreadRun = False
192

193 194 195
                                              
class JobControllerPanel(wx.ScrolledWindow):
    
196
    columns = [("name",(180,-1)),("pid",(60,-1)),("start",(150,-1)),("elapsed",(140,-1)),("state",(90,-1)),("progress",(-1,-1)),("eta",(120,-1)),("kill",(50,-1))]
197 198 199 200 201 202 203 204 205 206
    
    def __init__(self, parent):
        
        wx.ScrolledWindow.__init__(self, parent, id = wx.ID_ANY)

        self.SetScrollbars(pixelsPerUnitX=1, pixelsPerUnitY=1, noUnitsX=50, noUnitsY=50)
        
        self.parent = parent
                                
        self._gbSizer = wx.GridBagSizer(0,0)
207 208 209 210 211

        for i,(col,s) in enumerate(JobControllerPanel.columns):
            self._gbSizer.Add(wx.TextCtrl(self,wx.ID_ANY,value=col.upper(),size=s,style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL),pos=(0,i),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)

        self._gbSizer.AddGrowableCol(5)
212 213 214 215
                        
        self.SetSizer(self._gbSizer)

        self._jobsController = JobController(self,True)
216 217
        
        self._jobs = {}
218 219 220

        EVT_JOB_CONTROLLER(self,self.on_update)
        
221 222
        self.Bind(wx.EVT_WINDOW_DESTROY,self.OnDestroy)
        
223
        PUBLISHER.subscribe(self.msg_start_job,"msg_start_job")
224 225 226 227 228 229 230

    def __del__(self):

        self._jobsController.stop()
        while self._jobsController.is_alive():
            time.sleep(0.01)
            
231 232
    def OnDestroy(self,event):
        
233
        PUBLISHER.subscribe(self.msg_start_job,"msg_start_job")
234 235
        event.Skip()
        
236
    def msg_start_job(self,message):
237 238 239 240 241 242
                
        self._jobsController.update()

    def on_display_info(self,event):
        
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
243
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 
244 245 246 247 248 249 250

        f = wx.Frame(self,size=(800,500))
                
        panel = wx.Panel(f,wx.ID_ANY)
        
        sizer = wx.BoxSizer(wx.VERTICAL)
        
251
        info = self._jobsController.runningJobs[name]['info'].strip()
252 253 254
        if not info:
            info = "No information available about %r job." % name
        
255
        self._info = wx.TextCtrl(panel,wx.ID_ANY,style=wx.TE_AUTO_SCROLL|wx.TE_READONLY|wx.TE_MULTILINE)
256
        self._info.SetValue(info)
257 258 259 260 261 262
        
        sizer.Add(self._info,1,wx.ALL|wx.EXPAND,5)
        
        panel.SetSizer(sizer)
        
        f.Show()
263 264 265 266 267 268 269 270 271 272 273 274

    def on_display_traceback(self,event):
        
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 

        f = wx.Frame(self,size=(800,500))
                
        panel = wx.Panel(f,wx.ID_ANY)
        
        sizer = wx.BoxSizer(wx.VERTICAL)
        
275
        tb = self._jobsController.runningJobs[name]['traceback'].strip()
276 277 278 279 280 281 282 283 284 285 286
        if not tb:
            tb = "No traceback available about %r job." % name
        
        self._tb = wx.TextCtrl(panel,wx.ID_ANY,style=wx.TE_AUTO_SCROLL|wx.TE_READONLY|wx.TE_MULTILINE)
        self._tb.SetValue(tb)
        
        sizer.Add(self._tb,1,wx.ALL|wx.EXPAND,5)
        
        panel.SetSizer(sizer)
        
        f.Show()
287 288
                                    
    def on_kill_job(self, event):
289
                
290
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
291
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 
292 293 294

        d = wx.MessageDialog(None, 'Do you really want to kill job %r ?' % name, 'Question', wx.YES_NO|wx.YES_DEFAULT|wx.ICON_EXCLAMATION)
        if d.ShowModal() == wx.ID_YES:
295
            self._jobsController.kill_job(self._jobsController.runningJobs[name])
296 297 298
                        
    def on_update(self, event):
                                        
299
        runningJobs = event.runningJobs
300

301
        # Remove the widgets corresponding to the jobs that are not running anymore  
302
        for k,v in self._jobs.items():
303

304
            if runningJobs.has_key(k):
305
                continue
306
                                    
307 308 309 310 311
            row,_ = self._gbSizer.GetItemPosition(v['name'])            
            for c in range(self._gbSizer.GetCols()):
                w = self._gbSizer.FindItemAtPosition((row,c))
                w.GetWindow().Destroy()
            del self._jobs[k]
312
        
313 314 315
            for r in range(row+1,self._gbSizer.GetRows()):
                for i in range(self._gbSizer.GetCols()):
                    w = self._gbSizer.FindItemAtPosition((r,i))
316 317
                    if w is None:
                        continue
318 319
                    self._gbSizer.SetItemPosition(w.GetWindow(),(r-1,i))

320
        for jobName, jobStatus in runningJobs.items():
321 322 323 324 325 326 327 328
                        
            if self._jobs.has_key(jobName):
                self._jobs[jobName]['progress'].SetValue(jobStatus['progress'])
                self._jobs[jobName]['elapsed'].SetValue(jobStatus['elapsed'])
                self._jobs[jobName]['state'].SetLabel(jobStatus['state'])
                self._jobs[jobName]['eta'].SetValue(jobStatus['eta'])
            else:
                self._jobs[jobName] = {}
329 330 331 332 333
                self._jobs[jobName]['name'] = wx.Button(self, wx.ID_ANY, style=wx.BU_EXACTFIT,label=jobStatus['name'])
                self._jobs[jobName]['pid'] = intctrl.IntCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['pid'])
                self._jobs[jobName]['start'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['start'])
                self._jobs[jobName]['elapsed'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['elapsed'])
                self._jobs[jobName]['state'] = wx.Button(self, wx.ID_ANY, style=wx.BU_EXACTFIT,label=jobStatus['state'])
334 335
                self._jobs[jobName]['progress'] = wx.Gauge(self, wx.ID_ANY,range=100)
                self._jobs[jobName]['progress'].SetValue(jobStatus['progress'])
336 337
                self._jobs[jobName]['eta'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['eta'])
                self._jobs[jobName]['kill'] = wx.BitmapButton(self, wx.ID_ANY, ICONS["stop",16,16])
338

339 340
                r = len(self._jobs)
                        
341 342 343 344 345 346 347 348
                self._gbSizer.Add(self._jobs[jobName]['name']    ,pos=(r,0),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['pid']     ,pos=(r,1),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['start']   ,pos=(r,2),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['elapsed'] ,pos=(r,3),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['state']   ,pos=(r,4),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['progress'],pos=(r,5),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['eta']     ,pos=(r,6),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['kill']    ,pos=(r,7),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
349 350 351

            if jobStatus["state"] == "aborted":
                self._jobs[jobName]["name"].SetBackgroundColour(wx.RED)
352 353 354 355
                
            self.Bind(wx.EVT_BUTTON,self.on_display_info,self._jobs[jobName]['name'])
            self.Bind(wx.EVT_BUTTON,self.on_display_traceback,self._jobs[jobName]['state'])
            self.Bind(wx.EVT_BUTTON, self.on_kill_job,self._jobs[jobName]['kill'])
356 357

        self._gbSizer.Layout()
358
                                                                                                    
359 360 361 362 363 364 365 366 367 368 369 370
if __name__ == "__main__":
        
    app = wx.App(0)
    frame = wx.Frame(None, -1, "Job Controller")
     
    JobController = JobControllerPanel(frame)
     
    frame.Show(True)
    app.MainLoop()
    
    
    
371