JobControllerPanel.py 14.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# **************************************************************************
#
# MDANSE: Molecular Dynamics Analysis for Neutron Scattering Experiments
#
# @file      Src/GUI/JobControllerPanel.py
# @brief     Implements module/class/test JobControllerPanel
#
# @homepage  https://mdanse.org
# @license   GNU General Public License v3 or higher (see LICENSE)
# @copyright Institut Laue Langevin 2013-now
# @authors   Scientific Computing Group at ILL (see AUTHORS)
#
# **************************************************************************

15
16
17
18
19
20
21
22
23
24
25
26
27
28
import collections
import cPickle
import datetime
import glob
import os
import threading
import time

import wx
import wx.lib.intctrl as intctrl

from MDANSE.Core.Platform import PLATFORM
from MDANSE.Core.Singleton import Singleton
from MDANSE.Framework.Status import convert_duration, total_seconds
29
30

from MDANSE.GUI import PUBLISHER
31
from MDANSE.GUI.Icons import ICONS
32
from MDANSE.GUI.Events.JobControllerEvent import EVT_JOB_CONTROLLER, JobControllerEvent
33
34
        
class JobController(threading.Thread):
35
36
37
38
39
40
41
42
43
44
45
46
    '''
    This class sets up the job controlling daemon that will regurlarly check for the status of running MDANSE jobs.
    
    The job controller run on a separate thread in order to non-block the main thread.
    
    When the thread is running, it will check every 10s the status of the MDANSE running jobs by looking at their 
    corresponding temporary pickle files. It unpickles them and updates the :py:attr:`_runningJobs` dictionnary used to store
    the information about each running jobs (state, progress, eta, name ...). That dictionnary is passed to the job 
    controller panel widget using wx.PostEvent mechanism. In doing the job controller panel widget just has to bind 
    to EVT_JOB_CONTROLLER and it will receive the updated version of :py:attr:`_runningJobs` dictionnary every time 
    the :py:meth:run is called.     
    '''
47
48
49
50
    
    __metaclass__ = Singleton
    
    def __init__(self, window, start=False):
51
52
53
54
55
56
57
        '''
        
        :param window: the wx object that will be notify when job controller is updated
        :type window: wx object
        :param start: if True the job controller thread is started at __init__ stage.
        :type start: bool
        '''
58
59
        
        threading.Thread.__init__(self)
60
61

        self.setDaemon(True)
62
        
63
        # The wx object related to this thread
64
65
66
67
        self._window = window
        
        self._stop = threading.Event()
                                
68
69
70
71
        self._runningJobs = collections.OrderedDict()
                
        # This variable is used to keep track whether or not it is the first run of the thread.
        self._firstThreadRun = True
72
        
73
74
75
76
        if start:
            self.start()
        
    @property
77
    def runningJobs(self):
78
        
79
        return self._runningJobs
80

81
    def kill_job(self, info):
82
                
83
        if self._runningJobs.has_key(info["name"]):
84

85
            if info['state'] == 'running':
86
                try:
87
                    PLATFORM.kill_process(info['pid'])
88
89
90
                except:
                    pass

91
            del self._runningJobs[info["name"]]
92
93

        if os.path.exists(info['temporary_file']):            
94
            try:
95
                os.unlink(info['temporary_file'])
96
97
            except:
                pass
98

99
        self.update()
100
        
101
102
103
104
    def run(self):
        
        while not self._stop.is_set():
            self.update()
105
            self._stop.wait(10.0)
106
107
108
109
110

    def stop(self):
        
        self._stop.set()
        
111
    def update(self, init=False):
eric pellegrini's avatar
eric pellegrini committed
112
                
113
114
115
116
        pids = PLATFORM.get_processes_info()
                        
        # The list of the registered jobs.
        jobs = [f for f in glob.glob(os.path.join(PLATFORM.temporary_files_directory(),'*'))]
117
                        
118
        # Loop over the job registered at the previous controller check point     
119
        for job in self._runningJobs.keys():            
120
121

            # Case where a job has finished during two controller check points (i.e. its temporary file has been deleted)
122
            if self._runningJobs[job]['state'] == 'finished':
123
124
                continue
            
125
126
            # Case where the jobs has finished properly (e.g. its temporary file has been removed)
            if not job in jobs:
127
128
129
130
131
                self._runningJobs[job]['eta'] = 'N/A'
                self._runningJobs[job]['progress'] = 100
                self._runningJobs[job]['state'] = 'finished'
                start = datetime.datetime.strptime(self._runningJobs[job]["start"],"%d-%m-%Y %H:%M:%S")
                self._runningJobs[job]['elapsed'] = '%02d:%02dh:%02dm:%02ds' % convert_duration(total_seconds(datetime.datetime.today() - start))
132
                
133
        # Loop over the job whose temporary files are still present
134
        for job in jobs:
135
136
137

            # Open the job temporary file
            try:
138
                f = open(job, 'rb')
139
140
141
142
143
144
145
146
147
148
149
                info = cPickle.load(f)
                f.close()
                
            # If the file could not be opened/unpickled for whatever reason, try at the next checkpoint
            except:
                continue

            # The job file could be opened and unpickled properly
            else:
                
                name = info['name']
150
                                
151
152
                # Check that the pid of the running job corresponds to an active pid.
                running = (info['pid'] in pids)
153
                
154
155
156
157
158
159
160
                # If so, check that the corresponding pid actually corresponds to the job by comparing 
                # the job starting date and the pid creation date.
                if running:
                    jobStartingTime = datetime.datetime.strptime(info["start"],"%d-%m-%Y %H:%M:%S")
                    procStartingTime = datetime.datetime.strptime(pids[info['pid']],"%d-%m-%Y %H:%M:%S")
                    running = (jobStartingTime >= procStartingTime)
                    
161
162
163
                if not running:
                    info["state"] = "aborted"
                    
164
                # If the job was aborted, display the traceback on the dialog logger and remove the corresponding job temporary file
165
                if self._firstThreadRun and info['state'] == 'aborted':
166
                    self.kill_job(info)
167
168
                    continue

169
                self._runningJobs[name] = info
170
                
171
        wx.PostEvent(self._window, JobControllerEvent(self._runningJobs))
172
        
173
        self._firstThreadRun = False
174

175
176
177
                                              
class JobControllerPanel(wx.ScrolledWindow):
    
178
    columns = [("name",(180,-1)),("pid",(60,-1)),("start",(150,-1)),("elapsed",(140,-1)),("state",(90,-1)),("progress",(-1,-1)),("eta",(120,-1)),("kill",(50,-1))]
179
180
181
182
183
184
185
186
187
188
    
    def __init__(self, parent):
        
        wx.ScrolledWindow.__init__(self, parent, id = wx.ID_ANY)

        self.SetScrollbars(pixelsPerUnitX=1, pixelsPerUnitY=1, noUnitsX=50, noUnitsY=50)
        
        self.parent = parent
                                
        self._gbSizer = wx.GridBagSizer(0,0)
189
190
191
192
193

        for i,(col,s) in enumerate(JobControllerPanel.columns):
            self._gbSizer.Add(wx.TextCtrl(self,wx.ID_ANY,value=col.upper(),size=s,style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL),pos=(0,i),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)

        self._gbSizer.AddGrowableCol(5)
194
195
196
197
                        
        self.SetSizer(self._gbSizer)

        self._jobsController = JobController(self,True)
198
199
        
        self._jobs = {}
200
201
202

        EVT_JOB_CONTROLLER(self,self.on_update)
        
203
204
        self.Bind(wx.EVT_WINDOW_DESTROY,self.OnDestroy)
        
205
        PUBLISHER.subscribe(self.msg_start_job,"msg_start_job")
206
207
208
209
210
211
212

    def __del__(self):

        self._jobsController.stop()
        while self._jobsController.is_alive():
            time.sleep(0.01)
            
213
214
    def OnDestroy(self,event):
        
215
        PUBLISHER.subscribe(self.msg_start_job,"msg_start_job")
216
217
        event.Skip()
        
218
    def msg_start_job(self,message):
219
220
221
222
223
224
        t = threading.Thread(target=self.updateJobsController)
        t.start()

    def updateJobsController(self):
        time.sleep(PLATFORM.jobs_launch_delay())
        self._jobsController.update()

225
226
227
    def on_display_info(self,event):
        
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
228
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 
229
230
231
232
233
234
235

        f = wx.Frame(self,size=(800,500))
                
        panel = wx.Panel(f,wx.ID_ANY)
        
        sizer = wx.BoxSizer(wx.VERTICAL)
        
236
        info = self._jobsController.runningJobs[name]['info'].strip()
237
238
239
        if not info:
            info = "No information available about %r job." % name
        
240
        self._info = wx.TextCtrl(panel,wx.ID_ANY,style=wx.TE_AUTO_SCROLL|wx.TE_READONLY|wx.TE_MULTILINE)
241
        self._info.SetValue(info)
242
243
244
245
246
247
        
        sizer.Add(self._info,1,wx.ALL|wx.EXPAND,5)
        
        panel.SetSizer(sizer)
        
        f.Show()
248
249
250
251
252
253
254
255
256
257
258
259

    def on_display_traceback(self,event):
        
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 

        f = wx.Frame(self,size=(800,500))
                
        panel = wx.Panel(f,wx.ID_ANY)
        
        sizer = wx.BoxSizer(wx.VERTICAL)
        
260
        tb = self._jobsController.runningJobs[name]['traceback'].strip()
261
262
263
264
265
266
267
268
269
270
271
        if not tb:
            tb = "No traceback available about %r job." % name
        
        self._tb = wx.TextCtrl(panel,wx.ID_ANY,style=wx.TE_AUTO_SCROLL|wx.TE_READONLY|wx.TE_MULTILINE)
        self._tb.SetValue(tb)
        
        sizer.Add(self._tb,1,wx.ALL|wx.EXPAND,5)
        
        panel.SetSizer(sizer)
        
        f.Show()
272
273
                                    
    def on_kill_job(self, event):
274
                
275
        row = self._gbSizer.GetItemPosition(event.GetEventObject())[0]
276
        name = self._gbSizer.FindItemAtPosition((row,0)).Window.GetLabel() 
277
278
279

        d = wx.MessageDialog(None, 'Do you really want to kill job %r ?' % name, 'Question', wx.YES_NO|wx.YES_DEFAULT|wx.ICON_EXCLAMATION)
        if d.ShowModal() == wx.ID_YES:
280
            self._jobsController.kill_job(self._jobsController.runningJobs[name])
281
282
283
                        
    def on_update(self, event):
                                        
284
        runningJobs = event.runningJobs
285

286
        # Remove the widgets corresponding to the jobs that are not running anymore  
287
        for k,v in self._jobs.items():
288

289
            if runningJobs.has_key(k):
290
                continue
291
                                    
292
293
294
295
296
            row,_ = self._gbSizer.GetItemPosition(v['name'])            
            for c in range(self._gbSizer.GetCols()):
                w = self._gbSizer.FindItemAtPosition((row,c))
                w.GetWindow().Destroy()
            del self._jobs[k]
297
        
298
299
300
            for r in range(row+1,self._gbSizer.GetRows()):
                for i in range(self._gbSizer.GetCols()):
                    w = self._gbSizer.FindItemAtPosition((r,i))
301
302
                    if w is None:
                        continue
303
304
                    self._gbSizer.SetItemPosition(w.GetWindow(),(r-1,i))

305
        for jobName, jobStatus in runningJobs.items():
306
307
308
309
310
311
312
313
                        
            if self._jobs.has_key(jobName):
                self._jobs[jobName]['progress'].SetValue(jobStatus['progress'])
                self._jobs[jobName]['elapsed'].SetValue(jobStatus['elapsed'])
                self._jobs[jobName]['state'].SetLabel(jobStatus['state'])
                self._jobs[jobName]['eta'].SetValue(jobStatus['eta'])
            else:
                self._jobs[jobName] = {}
314
315
316
317
318
                self._jobs[jobName]['name'] = wx.Button(self, wx.ID_ANY, style=wx.BU_EXACTFIT,label=jobStatus['name'])
                self._jobs[jobName]['pid'] = intctrl.IntCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['pid'])
                self._jobs[jobName]['start'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['start'])
                self._jobs[jobName]['elapsed'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['elapsed'])
                self._jobs[jobName]['state'] = wx.Button(self, wx.ID_ANY, style=wx.BU_EXACTFIT,label=jobStatus['state'])
319
320
                self._jobs[jobName]['progress'] = wx.Gauge(self, wx.ID_ANY,range=100)
                self._jobs[jobName]['progress'].SetValue(jobStatus['progress'])
321
322
                self._jobs[jobName]['eta'] = wx.TextCtrl(self, wx.ID_ANY, style=wx.TE_READONLY|wx.ALIGN_CENTER_HORIZONTAL,value=jobStatus['eta'])
                self._jobs[jobName]['kill'] = wx.BitmapButton(self, wx.ID_ANY, ICONS["stop",16,16])
323

324
325
                r = len(self._jobs)
                        
326
327
328
329
330
331
332
333
                self._gbSizer.Add(self._jobs[jobName]['name']    ,pos=(r,0),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['pid']     ,pos=(r,1),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['start']   ,pos=(r,2),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['elapsed'] ,pos=(r,3),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['state']   ,pos=(r,4),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['progress'],pos=(r,5),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['eta']     ,pos=(r,6),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
                self._gbSizer.Add(self._jobs[jobName]['kill']    ,pos=(r,7),flag=wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL)
334
335
336

            if jobStatus["state"] == "aborted":
                self._jobs[jobName]["name"].SetBackgroundColour(wx.RED)
337
338
339
340
                
            self.Bind(wx.EVT_BUTTON,self.on_display_info,self._jobs[jobName]['name'])
            self.Bind(wx.EVT_BUTTON,self.on_display_traceback,self._jobs[jobName]['state'])
            self.Bind(wx.EVT_BUTTON, self.on_kill_job,self._jobs[jobName]['kill'])
341
342

        self._gbSizer.Layout()
343
                                                                                                    
344
345
346
347
348
349
350
351
352
353
354
355
if __name__ == "__main__":
        
    app = wx.App(0)
    frame = wx.Frame(None, -1, "Job Controller")
     
    JobController = JobControllerPanel(frame)
     
    frame.Show(True)
    app.MainLoop()
    
    
    
356