00001 import glob
00002 import os
00003 import re
00004
00005 import Ganga.GPI
00006
00007 from GBSIdCodes import *
00008 from GBSConfig import GetConfigValue
00009 from GBSLogger import Log, logger
00010 from GBSObject import GBSObject
00011 from GBSModelRegistry import GetModelRegistry
00012 from GBSTimeStamp import GBSTimeStamp
00013 import GBSUtilities
00014
00015 class GBSTask(GBSObject) :
00016 """Top level object to manage a single task.
00017
00018 This object will manages an set of Jobs, each of which looks after
00019 a single job, submitting and resubmitting until successful or user
00020 intervention is required.
00021
00022 ProtoJobs
00023 =========
00024
00025 Batches of new jobs are introduced into the system initially as "ProtoJobs".
00026 The following methods are used to manipulate them:-
00027
00028 AddProtoJob(job_name,args_str="",env_str="")
00029 PromoteProtoJobs(job_name_pattern = ".*")
00030 RemoveProtoJobs(job_name_pattern = ".*")
00031
00032 """
00033
00034
00035
00036
00037 def __init__(self,name,parent,model,model_args):
00038 self.__scriptFileName = ""
00039 self.__scriptGlobalArgs = ""
00040 self.__globalEnvironment = ""
00041 self.__globalInputSandbox = ""
00042 self.__globalOutputSandbox = ""
00043 self.__backend = GetConfigValue("DefaultBackend")
00044 self.__submitEnabled = 0
00045 self.__maxGangaJobs = int(GetConfigValue("DefaultMaxGangaJobs"))
00046 self.__maxSubmitJobs = int(GetConfigValue("DefaultMaxSubmitJobs"))
00047 self.__mode = "Test"
00048
00049
00050
00051 self._jobManagers = {}
00052 self.__gangaTreeDir = 'gbs/' + name
00053
00054 self.__jstatAll = 0
00055 self.__jstatNReadyAll = 0
00056 self.__jstatNReadyOther = 0
00057 self.__jstatReadyAll = 0
00058 self.__jstatReadyRetry = 0
00059 self.__jstatSubmitAll = 0
00060 self.__jstatSubmitNRun = 0
00061 self.__jstatDoneAll = 0
00062 self.__jstatDoneFail = 0
00063
00064 GBSObject.__init__(self,name,parent,model)
00065
00066
00067 if not Ganga.GPI.jobtree.exists(self.__gangaTreeDir): Ganga.GPI.jobtree.mkdir(self.__gangaTreeDir)
00068
00069 self.MakeChildDirectory()
00070 self.__ReloadChildren()
00071 self.RefreshJobStats()
00072
00073
00074 def _DoMemberIO(self,ioh):
00075 self.__scriptFileName = ioh("Script file name", "s",self.__scriptFileName)
00076 self.__scriptGlobalArgs = ioh("Script Global Args", "s",self.__scriptGlobalArgs)
00077 self.__globalEnvironment = ioh("+Global environment", "s",self.__globalEnvironment)
00078 self.__globalInputSandbox = ioh("+Global Input Sandbox List", "s",self.__globalInputSandbox)
00079 self.__globalOutputSandbox = ioh("+Global Output Sandbox List", "s",self.__globalOutputSandbox)
00080 self.__backend = ioh("Backend", "s",self.__backend)
00081 self.__submitEnabled = ioh("Submit Enabled", "i",self.__submitEnabled)
00082 self.__maxGangaJobs = ioh("Max Ganga Jobs", "i",self.__maxGangaJobs)
00083 self.__maxSubmitJobs = ioh("Max Submit Jobs", "i",self.__maxSubmitJobs)
00084 self.__mode = ioh("+Test/Production Mode", "s",self.__mode)
00085 GBSObject._DoMemberIO(self,ioh)
00086
00087 def WriteFamily(self):
00088 self.Write()
00089 for job_name,job in self._jobManagers.iteritems():job.WriteFamily()
00090
00091 def GetType(self): return "GBSTask"
00092
00093 def __repr__(self) : return self.AsString()
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103 def AsString(self,level = "Full"):
00104
00105 """Return string description.
00106
00107 Return string description at the following levels:-
00108 "Brief" one line summary suitable for rows in tables
00109 "Heading" one line summary suitable as heading for "Brief"
00110 "Full" full desciption including value of every data member"""
00111
00112 if ( level == "Heading"):
00113 s = "Name".ljust(20)
00114 s += "Model".ljust(15)
00115 s += "!Ready(other) Ready(retry) Sub(!run) Done(fail)".rjust(51)
00116 s += "Backend".rjust(10) + " "
00117 s += "Script file".ljust(40)
00118 return s
00119 if ( level == "Brief"):
00120 s = self.GetName().ljust(20)
00121 s += self.GetModel().ljust(15)
00122 s += str(self.__jstatNReadyAll).rjust(8) + "("+ str(self.__jstatNReadyOther).rjust(3) + ")"
00123 s += str(self.__jstatReadyAll).rjust(9) + "("+ str(self.__jstatReadyRetry).rjust(3) + ")"
00124 s += str(self.__jstatSubmitAll).rjust(8) + "("+ str(self.__jstatSubmitNRun).rjust(3) + ")"
00125 s += str(self.__jstatDoneAll).rjust(8) + "("+ str(self.__jstatDoneFail).rjust(3) + ")"
00126 backend = self.__backend
00127 mo = re.search(r"(.*?):",backend)
00128 if mo: backend = mo.group(1)
00129 s += backend.rjust(10) + " "
00130 s += self.__scriptFileName.ljust(40)
00131 return s
00132
00133 s = GBSObject.__repr__(self) + "\n\n"
00134 s += "Model: " + self.GetModel() + "\n\n"
00135 s += "Job Definition\n"
00136 s += " Script file: " + self.__scriptFileName + "\n"
00137 s += " Global args: '" + self.__scriptGlobalArgs + "'\n"
00138 s += " Global env: '" + self.__globalEnvironment + "'\n"
00139 s += " Input Sandbox: '" + self.__globalInputSandbox + "'\n"
00140 s += " Output Sandbox: '" + self.__globalOutputSandbox + "'\n\n"
00141 s += "Job Submission\n"
00142 s += " "
00143 if self.__submitEnabled: s += "Enabled\n"
00144 else: s += "Disabled\n"
00145 s += " Backend: " + self.__backend + "\n"
00146 s += " Limits: " + str(self.__maxSubmitJobs) + "(single submit) " + str(self.__maxGangaJobs) + "(maximum)\n"
00147 s += " Mode: " + self.__mode + "\n\n"
00148 s += "Managing " + str(self.__jstatAll) + " jobs\n"
00149 s += " Holding: " + str(self.__jstatNReadyAll).rjust(5) + " ("+ str(self.__jstatNReadyOther) + " other)\n"
00150 s += " Waiting: " + str(self.__jstatReadyAll).rjust(5) + " ("+ str(self.__jstatReadyRetry) + " retries)\n"
00151 s += " Submitted: " + str(self.__jstatSubmitAll).rjust(5) + " ("+ str(self.__jstatSubmitNRun) + " not running)\n"
00152 s += " Done: " + str(self.__jstatDoneAll).rjust(5) + " ("+ str(self.__jstatDoneFail) + " failed)\n"
00153 return s
00154
00155
00156 def GetBackend(self):
00157 """Return backend for future jobs"""
00158 return self.__backend
00159
00160 def GetGangaTreeDir(self):
00161 """Return Ganga JobTree Directory that holds all Ganga jobs"""
00162 return self.__gangaTreeDir
00163
00164 def GetGlobalEnvironment(self,prettyPrint = False):
00165 """Return, as a comma separated list string, the environment that is global to all jobs
00166
00167 If prettyPrint is True print out list one item per line"""
00168 if not prettyPrint: return self.__globalEnvironment
00169 print "Global Environment:-"
00170 GBSUtilities.ListCommaSepList(self.__globalEnvironment," ")
00171
00172 def GetGlobalInputSandbox(self):
00173 """Return, as a comma separated list string, the input sandbox file list that is global to all jobs"""
00174 return self.__globalInputSandbox
00175
00176 def GetGlobalOutputSandbox(self):
00177 """Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs"""
00178 return self.__globalOutputSandbox
00179
00180 def GetMaxGangaJobs(self):
00181 """Return the maximum number of jobs that can be submitted to Ganga at any one time """
00182 return self.__maxGangaJobs
00183
00184 def GetMaxSubmitJobs(self):
00185 """Return the maximum number of jobs that can be submitted by a single call SubmitJobs() """
00186 return self.__maxSubmitJobs
00187
00188 def GetMode(self):
00189 """Return the Test/Production Mode """
00190 return self.__mode
00191
00192 def GetScriptFileName(self):
00193 """Returns the current user application script file name (or "" if none). """
00194 return self.__scriptFileName
00195
00196 def GetScriptFileSpec(self):
00197 if not self.__scriptFileName : return ""
00198 return self.GetStoreLocation("child_dir") + "/" + self.__scriptFileName
00199
00200 def GetJob(self,name,warn=True) :
00201 """Return Job called job_name e.g. job = task.GetJob("job_00123")"""
00202 if not self._jobManagers.has_key(name):
00203 if warn: print "Sorry, there is no Job named " + str(name)
00204 return None
00205 else : return self._jobManagers[name]
00206
00207 def GetJobs(self,job_name_pattern = ".*"):
00208 """Return a dictionary: dict[name] job for all jobs matching pattern"""
00209
00210 job_dict = {}
00211 for job_name,job in self._jobManagers.iteritems():
00212 if re.search(job_name_pattern,job_name): job_dict[job_name] = job
00213 return job_dict
00214
00215 def GetScriptGlobalArgs(self):
00216 """Returns (as a string) the comma separated list of application script args that are global to all jobs """
00217 return self.__scriptGlobalArgs
00218
00219 def GetTestOnlyMethods(self):
00220 """Return a list of methods that are only available in 'Test' mode."""
00221 return ["SetGlobalEnvironment", \
00222 "SetGlobalInputSandbox", \
00223 "SetGlobalOutputSandbox", \
00224 "SetScriptFileName", \
00225 "SetScriptGlobalArgs"] \
00226
00227 def IsAuthorisedToSubmit(self,warn=True):
00228 """Returns True is authorised to submit jobs.
00229
00230 Always True except if backend is LCG, then checks proxy"""
00231 if self.GetBackend()[0:3] != "LCG" : return True
00232 ok = False
00233 try: ok = Ganga.GPI.gridProxy.isValid()
00234 except: ok = False
00235 if warn and not ok: print "Cannot submit jobs; backend is LCG but cannot find valid proxy"
00236 return ok
00237
00238 def IsDisabled(self,method):
00239 """Return True if method is disabled."""
00240
00241 if self.GetMode() == 'Test': return False
00242 for disabled_method in self.GetTestOnlyMethods():
00243 if method == disabled_method:
00244 print "Method " + method + " is disabled in Production mode"
00245 return True
00246 return False
00247
00248 def ListJobs(self,job_name_pattern = ".*"):
00249
00250 """List existing Jobs matching pattern"""
00251
00252 print self.GetName() + " has the following jobs",
00253 if job_name_pattern != ".*": print "that match the pattern: '" + str(job_name_pattern) + "'",
00254 print ":-"
00255 first = 1
00256 job_names = self._jobManagers.keys()
00257 job_names.sort()
00258 for job_name in job_names:
00259 if not re.search(job_name_pattern,job_name): continue
00260 job = self._jobManagers[job_name]
00261 if first:
00262 print job.AsString("Heading") + "\n"
00263 first = 0
00264 print job.AsString("Brief")
00265 if first: print " no jobs found"
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 def AddJob(self,job_name,args_str = "",env_str = ""):
00276
00277 """Create a new named Job and optionally assign its application script local args and environment.
00278
00279 e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no")
00280
00281 Note that the job_name must begin job_ (if omitted it will be added automatically) and
00282 will be rejected if it contains anything beyond alphanumeric, '_' and '-'.for as it
00283 will be used to name a directory."""
00284
00285 return self._AddJobOrProtoJob(job_name,args_str,env_str,"Job")
00286
00287 def ClearErrorCountsJobs(self,job_name_pattern= ".*"):
00288 """After seeking confirmation, clear error counts on all suitable jobs matching pattern"""
00289 self.__ApplyActionToJobs("CLEAR ERROR COUNTS",job_name_pattern)
00290
00291 def ClearHistoryJobs(self,job_name_pattern= ".*"):
00292 """After seeking confirmation, clear histories on all suitable jobs matching pattern"""
00293 self.__ApplyActionToJobs("CLEAR HISTORY",job_name_pattern)
00294
00295 def EnableSubmit(self,enable=True):
00296 """Enable or disable submission of jobs via the SubmitJobs() method.
00297 The function returns the previous value of the switch."""
00298
00299 old_val = self.__submitEnabled
00300 self.__submitEnabled = 0
00301 if enable: self.__submitEnabled = 1
00302 self.Write()
00303 return old_val
00304
00305 def HoldJobs(self,job_name_pattern= ".*"):
00306 """After seeking confirmation, hold all suitable jobs matching pattern"""
00307 self.__ApplyActionToJobs("HOLD",job_name_pattern)
00308
00309 def KillJobs(self,job_name_pattern= ".*"):
00310 """After seeking confirmation, kill all suitable jobs matching pattern"""
00311 self.__ApplyActionToJobs("KILL",job_name_pattern)
00312
00313 def RefreshJobStats(self):
00314
00315 """ Bring job statistics up to date.
00316
00317 Should not be necessary (but harmless) for user to call this
00318 as child jobs call it when their state changes. Note: unlike
00319 UpdateJobsStatus this simply asks jobs what their current
00320 state without checking Ganga """
00321 self.__jstatAll = 0
00322 self.__jstatNReadyAll = 0
00323 self.__jstatNReadyOther = 0
00324 self.__jstatReadyAll = 0
00325 self.__jstatReadyRetry = 0
00326 self.__jstatSubmitAll = 0
00327 self.__jstatSubmitNRun = 0
00328 self.__jstatDoneAll = 0
00329 self.__jstatDoneFail = 0
00330 for job_name,job in self._jobManagers.iteritems():
00331 self.__jstatAll += 1
00332 pc = job.GetPhaseCode()
00333 if pc == GID_JPC_DONE_FAIL or pc == GID_JPC_DONE_NFAIL: self.__jstatDoneAll += 1
00334 if pc == GID_JPC_DONE_FAIL: self.__jstatDoneFail += 1
00335 if pc == GID_JPC_READY_RETRY or pc == GID_JPC_READY_NRETRY: self.__jstatReadyAll += 1
00336 if pc == GID_JPC_READY_RETRY: self.__jstatReadyRetry += 1
00337 if pc == GID_JPC_SUBMIT_RUN or pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitAll += 1
00338 if pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitNRun += 1
00339 if pc == GID_JPC_NREADY_HOLD or pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyAll += 1
00340 if pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyOther += 1
00341
00342 def ReleaseJobs(self,job_name_pattern = ".*"):
00343 """After seeking confirmation, release all suitable jobs matching pattern"""
00344 self.__ApplyActionToJobs("RELEASE",job_name_pattern)
00345
00346 def RemoveJobs(self,job_name_pattern= ".*"):
00347 """After seeking confirmation, remove all suitable jobs matching pattern"""
00348 self.__ApplyActionToJobs("REMOVE",job_name_pattern)
00349
00350 def SetBackend(self,backend):
00351
00352 """Define backend for future jobs: One of: Local, PBS:queue, LCG:queue"""
00353
00354
00355 if not re.search(r"^(Local|PBS|LCG)(:.+)?$",backend):
00356 print "Sorry, '" + str(backend) + "' is not an acceptible backend. It should be one of:-\n"\
00357 + " Local, PBS{:queue} or LCG{:queue}"
00358 return
00359 self.__backend = backend
00360 self.Write()
00361
00362 def SetGlobalEnvironment(self,env_str):
00363
00364 """Set, as a comma separated list string, the environment that is global to all jobs.
00365
00366 e.g. task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456')
00367
00368 If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it"""
00369
00370 if self.IsDisabled("SetGlobalEnvironment"): return
00371 if re.search(r"^\+",env_str):
00372 s = self.__globalEnvironment
00373 if s: s += ','
00374 s += env_str[1:]
00375
00376
00377 GBSTask.SetGlobalEnvironment(self,s)
00378 return
00379 d = GBSUtilities.ParseEnvStr(env_str)
00380 if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
00381 else:
00382 self.__globalEnvironment = GBSUtilities.BuildEnvStr(d)
00383 self.Write()
00384
00385 def SetGlobalInputSandbox(self,in_sbox_str):
00386
00387 """Set, as a comma separated list string, the input sandbox file list that is global to all jobs.
00388
00389 e.g. task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
00390
00391 if self.IsDisabled("SetGlobalInputSandbox"): return
00392 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
00393 if ok:
00394 self.__globalInputSandbox = str
00395 self.Write()
00396
00397 def SetGlobalOutputSandbox(self,out_sbox_str):
00398
00399 """Set, as a comma separated list string, the output sandbox file list that is global to all jobs.
00400
00401 e.g. task.SetGlobalOutputSandbox('my_output_data.dat,my_output.log')"""
00402
00403 if self.IsDisabled("SetGlobalOutputSandbox"): return
00404 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
00405 if ok:
00406 self.__globalOutputSandbox = str
00407 self.Write()
00408
00409 def SetMaxGangaJobs(self,n):
00410 """Set the maximum number of jobs that can be submitted to Ganga at any one time"""
00411
00412 self.__maxGangaJobs = n
00413 self.Write()
00414
00415 def SetMaxSubmitJobs(self,n):
00416 """Set the maximum number of jobs that can be submitted by a single call SubmitJobs()"""
00417
00418 self.__maxSubmitJobs = n
00419 self.Write()
00420
00421 def SetMode(self,mode):
00422 """Set the Test/Production Mode """
00423 if mode != "Test" and mode != "Production":
00424 print "Illegal mode: '" + mode + "', only 'Test' and 'Production' permitted"
00425 return
00426 if self.__mode == mode: return
00427 if mode == 'Production':
00428 print "Caution: Switching to 'Production' mode will disable the following methods:-\n"
00429 print str(self.GetTestOnlyMethods())
00430 print "\n but should be done before moving into full production."
00431 else:
00432 if not self.GetScriptFileName():
00433 print "You cannot switch to 'Production' mode; no global script file has been defined."
00434 return
00435 print "Caution: Switching back to 'Test' mode is potentially dangerous as it re-enables the following methods:-\n"
00436 print str(self.GetTestOnlyMethods())
00437 print "\n and using these methods could produce results that are not homogeneous"
00438 ans = raw_input("Do you want to proceed ?y[n]")
00439 if not re.search(r"^y",ans,re.I):
00440 print "Mode not changed."
00441 return
00442 print "Mode changed to " + mode
00443 self.__mode = mode
00444 self.Write()
00445
00446 def SetScriptFileName(self,ext_file_spec):
00447
00448 """Pass in script file to be used by jobs. GBS make a local copy
00449 so no need to keep once passed.
00450
00451 e.g. task.SetScriptFileName("my/directory/my_script_file.sh")"""
00452
00453 if self.IsDisabled("SetScriptFileName"): return
00454 import os
00455 if not os.path.isfile(ext_file_spec):
00456 print "Cannot find script file " + str(ext_file_spec)
00457 return
00458 self.__scriptFileName = os.path.basename(ext_file_spec)
00459 int_file_spec = self.GetScriptFileSpec()
00460 print "Copying " + str(ext_file_spec) + " -> " + str(int_file_spec)
00461 if os.system("cp " + str(ext_file_spec) + " " + str(int_file_spec)):
00462 print "Failed to copy file!"
00463 self.__scriptFileName = ""
00464 self.Write()
00465
00466 def SetScriptGlobalArgs(self,arg_str):
00467
00468 """Set (as a string) the comma separated list of application script args that are global to all jobs."""
00469
00470 if self.IsDisabled("SetScriptGlobalArgs"): return
00471 self.__scriptGlobalArgs = arg_str
00472 self.Write()
00473
00474 def SubmitJobs(self,num_req = 0):
00475
00476 """If enabled (see EnableSubmit()),
00477 submit up to the limit (see GetMaxSubmitJobs()) jobs,
00478 but not to exceed the maximum (see GetMaxGangaJobs())
00479 first choosing ones scheduled for retry and
00480 then new jobs. Function returns number submitted."""
00481
00482 num_sub = 0
00483
00484
00485 if not self.IsAuthorisedToSubmit(): return num_sub
00486 if not self.__submitEnabled:
00487 print "Sorry, cannot submit; submit not enabled."
00488 return num_sub
00489 if not self.__scriptFileName:
00490 print "Sorry, cannot submit; no user application script assigned."
00491 return num_sub
00492
00493
00494 self.UpdateJobsStatus()
00495 if self.__maxGangaJobs <= self.__jstatSubmitAll:
00496 print "Sorry, cannot submit; already have %d (limit %d)" % (self.__jstatSubmitAll,self.__maxGangaJobs)
00497 return num_sub
00498 if num_req <= 0: num_req = self.__maxSubmitJobs
00499 max_job = min(num_req,self.__maxGangaJobs - self.__jstatSubmitAll)
00500 if max_job < num_req: print "Sorry, can only submit %d, close to the limit of %d" % (max_job,self.__maxGangaJobs)
00501
00502
00503 for jsc in [GID_JSC_RETRY,GID_JSC_NEW]:
00504 job_names = self._jobManagers.keys()
00505 job_names.sort()
00506 for job_name in job_names:
00507 job = self._jobManagers[job_name]
00508 if job.GetStatusCode() != jsc: continue
00509 job.Submit()
00510 num_sub += 1
00511 if num_sub >= max_job: return num_sub
00512 return num_sub
00513
00514 def UpdateJobsStatus(self):
00515
00516 """Send all jobs their UpdateStatus(). May involve checking Ganga job and retrieving output.
00517
00518 As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder
00519 but not owned by any Job"""
00520
00521
00522
00523 if not Ganga.GPI.config['PollThread']['autostart']:
00524 timeout = 60 + 60*self.__jstatSubmitAll/int(GetConfigValue("GangaRegistryPollRate"))
00525 Ganga.GPI.runMonitoring(1,timeout)
00526
00527 owned_ganga_ids = {}
00528 for job_name,job in self._jobManagers.iteritems():
00529 job.UpdateStatus()
00530 owned_ganga_ids[job.GetGangaJobId()] = 1
00531
00532 Ganga.GPI.jobtree.cleanlinks()
00533 for gj in Ganga.GPI.jobtree.getjobs(self.__gangaTreeDir):
00534 if owned_ganga_ids.has_key(gj.id): continue
00535 Log(self,logger.WARNING,"Removing orphaned Ganga job %d" % gj.id)
00536 gj.remove()
00537
00538
00539
00540 def WarnLowGridProxy(self,email_list,proxy_min_hours=3.,myproxy_min_days=3.):
00541 """If there is a valid GRID proxy, but it is running low:-
00542
00543 1) 'proxy_min_hours' defined and Proxy lifetime is less than 'proxy_min_hours'
00544 or 2) 'myproxy_min_days defined and myproxy lifetime is less that 'myproxy_min_days'
00545 """
00546
00547 if not email_list or os.system("voms-proxy-info --exists"): return
00548
00549 proxy_warn = ""
00550
00551
00552 if proxy_min_hours:
00553 inp = os.popen("voms-proxy-info --timeleft","r")
00554 line = inp.readline()
00555 inp.close()
00556 try:
00557 hours_left = int(line)/(60.*60.)
00558 if hours_left < proxy_min_hours: proxy_warn = "GRID proxy remaining lifetime = %.1f hours " % hours_left
00559 except: pass
00560
00561
00562 if myproxy_min_days:
00563 inp = os.popen("myproxy-info | grep timeleft","r")
00564 line = inp.readline()
00565 inp.close()
00566 mo = re.search(r"\s+(\d+):(\d+):(\d+)",line)
00567 if mo:
00568 days_left = ( (int(mo.group(3))/60.+int(mo.group(2)))/60. + int(mo.group(1)) )/24.
00569 if days_left < myproxy_min_days: proxy_warn += "myproxy remaining lifetime = %.1f days" % days_left
00570
00571 if proxy_warn:
00572 cmd = 'mail -s"Task:%s WARNING: %s" %s<< EOD\n' % (self.GetName(),proxy_warn,email_list)
00573 cmd += "\nPlease log in and renew the proxy/myproxy as appropriate.\nEOD\n"
00574 os.system(cmd)
00575
00576
00577 def WriteHtmlReport(self,task_dir):
00578 """Write HTML status report <task_dir>/<task-name>.html.
00579
00580 Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html."""
00581
00582 if not os.path.isdir(task_dir):
00583 print "Unable to find directory: " + str(task_dir)
00584 return
00585 task_name = self.GetName()
00586
00587
00588 job_dir = task_dir + "/" + task_name
00589 if not os.path.isdir(job_dir):
00590 try:
00591 os.mkdir(job_dir,0755)
00592 except OSError:
00593 print "Unable to create directory: " + str(job_dir)
00594 return
00595 else:
00596 os.system("cd " + job_dir + "; rm -f *")
00597
00598
00599 self.RefreshJobStats()
00600
00601
00602 task_file = task_dir + "/" + task_name + '.html'
00603 try:
00604 f_task = open(task_file, 'w')
00605 except OSError:
00606 print "Unable to create file: " + str(task_file)
00607 return
00608 f_task.write("<html><body>\n")
00609 f_task.write("<h1>Status report for Task " + task_name + " produced on " + GBSTimeStamp() + "</h1>\n<pre>\n")
00610 f_task.write(str(self))
00611 f_task.write("\n</pre>\n")
00612
00613
00614 red = "#ff6666"
00615 green = "#66ff66"
00616 blue = "#66aaff"
00617 purple = "#ff44ff"
00618 grey_light = "#999999"
00619 grey_dark = "#777777"
00620
00621
00622 f_task.write("<h2>Task Status Table</h2>\n")
00623 f_task.write("<p><table border=1>\n")
00624
00625 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_light + ">Not ready HELD (")
00626 num_nready_held = self.__jstatNReadyAll - self.__jstatNReadyOther
00627 if num_nready_held:
00628 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_HOLD,num_nready_held))
00629 else: f_task.write('0)\n')
00630
00631 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_dark + ">Not ready Not HELD (")
00632 if self.__jstatNReadyOther:
00633 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_NHOLD,self.__jstatNReadyOther))
00634 else: f_task.write('0)\n')
00635
00636 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + blue + \
00637 ">Ready (" + str(self.__jstatReadyAll) + ")\n")
00638
00639 f_task.write("<td align=center bgcolor=" + green + ">First attempt (")
00640 num_ready_nretry = self.__jstatReadyAll - self.__jstatReadyRetry
00641 if num_ready_nretry:
00642 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_NRETRY,num_ready_nretry))
00643 else: f_task.write('0)\n')
00644
00645 f_task.write("<tr><td align=center bgcolor=" + red + ">Retry (")
00646 if self.__jstatReadyRetry:
00647 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_RETRY,self.__jstatReadyRetry))
00648 else: f_task.write('0)\n')
00649
00650 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + purple + \
00651 ">Submitted (" + str(self.__jstatSubmitAll) + ")\n")
00652
00653 f_task.write("<td align=center bgcolor=" + green + ">Running (")
00654 num_submit_run = self.__jstatSubmitAll - self.__jstatSubmitNRun
00655 if num_submit_run:
00656 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_RUN,num_submit_run))
00657 else: f_task.write('0)\n')
00658
00659 f_task.write("<tr><td align=center bgcolor=" + red + ">Not running (\n")
00660 if self.__jstatSubmitNRun:
00661 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_NRUN,self.__jstatSubmitNRun))
00662 else: f_task.write('0)\n')
00663
00664 f_task.write("<tr><td colspan=2 align=center bgcolor=" + green + ">Succeeded (")
00665 num_done_nfail = self.__jstatDoneAll - self.__jstatDoneFail
00666 if num_done_nfail:
00667 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_NFAIL,num_done_nfail))
00668 else: f_task.write('0)\n')
00669
00670 f_task.write("<tr><td colspan=2 align=center bgcolor=" + red + ">Failed (")
00671 if self.__jstatDoneFail:
00672 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_FAIL,self.__jstatDoneFail))
00673 else: f_task.write('0)\n')
00674
00675 f_task.write("</table>\n")
00676
00677
00678 phase_rows = {}
00679 phase_rows[GID_JPC_NREADY_HOLD] = []
00680 phase_rows[GID_JPC_NREADY_NHOLD] = []
00681 phase_rows[GID_JPC_READY_NRETRY] = []
00682 phase_rows[GID_JPC_READY_RETRY] = []
00683 phase_rows[GID_JPC_SUBMIT_RUN] = []
00684 phase_rows[GID_JPC_SUBMIT_NRUN] = []
00685 phase_rows[GID_JPC_DONE_NFAIL] = []
00686 phase_rows[GID_JPC_DONE_FAIL] = []
00687
00688
00689
00690 f_task.write('<h2><a name="table_by_name">Job Table ordered by Name</a></h2\n')
00691 f_task.write('<p>See also <a href="#table_by_phase">Job Table ordered by Phase</a><p\n')
00692 table_header = "<p><table border=1>\n<tr><th>Job Name<th>Status<br>Date Time<th>Ganga<br>Job<th>Try<br>Num<th>Status\n"
00693 f_task.write(table_header)
00694 job_names = self._jobManagers.keys()
00695 job_names.sort()
00696 for job_name in job_names:
00697 job = self._jobManagers[job_name]
00698 job_name = job.GetName()
00699
00700
00701 pc = job.GetPhaseCode()
00702 colour_1,colour_2 = grey_dark, grey_dark
00703 if pc == GID_JPC_NREADY_HOLD: colour_1,colour_2 = grey_light,grey_light
00704 if pc == GID_JPC_NREADY_NHOLD: colour_1,colour_2 = grey_dark, grey_dark
00705 if pc == GID_JPC_READY_NRETRY: colour_1,colour_2 = blue, green
00706 if pc == GID_JPC_READY_RETRY: colour_1,colour_2 = blue, red
00707 if pc == GID_JPC_SUBMIT_RUN: colour_1,colour_2 = purple, green
00708 if pc == GID_JPC_SUBMIT_NRUN: colour_1,colour_2 = purple, red
00709 if pc == GID_JPC_DONE_NFAIL: colour_1,colour_2 = green, green
00710 if pc == GID_JPC_DONE_FAIL: colour_1,colour_2 = red, red
00711
00712
00713 table_row = ""
00714
00715
00716 job_file = job_dir + "/" + job_name + '.html'
00717 f_job = open(job_file, 'w')
00718 f_job.write("<html><body>\n")
00719 f_job.write("<h1>Status report for Job " + job_name + " produced on " + GBSTimeStamp() + "</h1>\n")
00720 f_job.write("<p>This job is part of the <a href=\"../" + task_name + ".html\">")
00721 f_job.write(task_name + "</a> task<p>\n<pre>\n")
00722 f_job.write(str(job))
00723 f_job.write("</pre></body></html>\n")
00724 f_job.close()
00725 table_row += "<tr><td bgcolor=" + colour_1 + "><a href=\"" + task_name + "/" \
00726 + job_name + ".html\">" + job_name + "</a>"
00727
00728
00729 table_row += "<td bgcolor=" + colour_1 + ">" + job.GetStatusTime()[6:-4]
00730 table_row += "<td bgcolor=" + colour_1 + ">"
00731 gj_id = "unknown"
00732 gj_stat_file = ""
00733 gj_obj = job.GetGangaJob()
00734 if gj_obj:
00735 gj_id = gj_obj.id
00736 else :
00737 gj_stat_file = job._GetTryOutputDir() + "/gbs_ganga.status"
00738 if os.path.isfile(gj_stat_file):
00739 f = os.popen('grep "id " ' + gj_stat_file)
00740 line = f.readline()
00741 f.close()
00742 mo = re.search(r"id = (\d+)",line)
00743 if mo: gj_id = mo.group(1)
00744 gj_obj = open(gj_stat_file)
00745 if not gj_obj:
00746 table_row += "n/a"
00747 else:
00748
00749
00750 gj_file = job_dir + "/" + job_name + '_ganga_job.html'
00751 f_gj = open(gj_file, 'w')
00752 f_gj.write("<html><body>\n")
00753 f_gj.write("<p><h1>Ganga job for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n")
00754 f_gj.write("<p>This job is part of the <a href=\"../" + task_name + ".html>")
00755 f_gj.write(task_name + "</a> task<p>\n<pre>\n")
00756 if gj_stat_file:
00757 f_gj.write("Dump of expired Ganga Job\n\n")
00758 for line in gj_obj: f_gj.write(line)
00759 gj_obj.close()
00760 else:
00761 f_gj.write("Dump of active Ganga Job\n\n")
00762 f_gj.write(str(gj_obj))
00763 f_gj.write("</pre></body></html>\n")
00764 f_gj.close()
00765 table_row += "<a href=\"" + task_name + "/" + job_name + "_ganga_job.html\">" + str(gj_id) + "</a>"
00766
00767
00768 hist_file = job_dir + "/" + job_name + '_history.html'
00769 f_hist = open(hist_file, 'w')
00770 f_hist.write("<html><body>\n")
00771 f_hist.write("<p><h1>Full try history for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n")
00772 f_hist.write("This file contains all the GBS Log Files (GLF) for "\
00773 + job_name + " which is is part of the <a href=\"../" + task_name + ".html\">")
00774 f_hist.write(task_name + "</a> task<p>\n<pre>\n")
00775 glf_wildcard = job.GetStoreLocation('child_dir') + "/*/gbs_*job*.log"
00776 for glf_file in glob.glob(glf_wildcard):
00777 mo = re.search(r"_(\d+)\.log$",glf_file)
00778 f_hist.write("</pre><h2>GLF Summary for %s</h2><pre>\n" % mo.group(1))
00779 f_glf = open(glf_file)
00780 f_hist.write(f_glf.read())
00781 f_glf.close()
00782 f_hist.write("</pre></body></html>\n")
00783 f_hist.close()
00784
00785 hist_file_rel = task_name + "/" + job_name + "_history.html"
00786 table_row += "<td bgcolor=%s align=center> <a href=\"%s\">%d</a>" % (colour_2,hist_file_rel,job.GetTryNumber())
00787 table_row += "<td bgcolor=" + colour_2 + ">" + GIDStringForJSC(job.GetStatusCode()).ljust(23) \
00788 + "[" + job.GetStatusText() + "]\n"
00789
00790 f_task.write(table_row)
00791 phase_rows[pc].append(table_row)
00792
00793 f_task.write("</table>\n")
00794
00795
00796 f_task.write('<h2><a name="table_by_phase">Job Table ordered by Phase</a></h2>\n')
00797 f_task.write('<p>See also <a href="#table_by_name">Job Table ordered by Name</a><p>\n')
00798 f_task.write(table_header)
00799 for pc,rows in phase_rows.iteritems():
00800 f_task.write('<a name="phase_%d"></a>\n' % pc)
00801 for row in rows: f_task.write(row)
00802 f_task.write("</table>\n")
00803 f_task.write("</body></html>\n")
00804 f_task.close()
00805 print "HTML Status report written to " + task_file
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816 def AddProtoJob(self,job_name,args_str="",env_str=""):
00817
00818 """Add a ProtJob with the name job_name and assign its local args the string args_str
00819 and environment env_str."""
00820
00821 if env_str:
00822 d = GBSUtilities.ParseEnvStr(env_str)
00823 if not d:
00824 print "Cannot parse environment string: " + env_str
00825 return None
00826 return self._AddJobOrProtoJob(job_name,args_str,env_str,"GBSProtoJob")
00827
00828 def PromoteProtoJobs(self,job_name_pattern = ".*"):
00829
00830 """Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all."""
00831
00832 pro_list = []
00833 for job_name,job in self._jobManagers.iteritems():
00834 if job.GetType() != "GBSProtoJob": continue
00835 if re.search(job_name_pattern,job_name): pro_list.append(job_name)
00836 num_pjobs = len(pro_list)
00837 if not num_pjobs:
00838 print "No ProtoJobs to promote"
00839 return
00840 ans = raw_input(str(num_pjobs) + " ProtoJobs are due for promotion. Do you want to proceed ?y[n]")
00841 if not re.search(r"^y",ans,re.I):
00842 print "Promotion aborted"
00843 return
00844 for job_name in pro_list:
00845 pjob = self._jobManagers[job_name]
00846 job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self)
00847 job.SetScriptLocalArgs(pjob.GetScriptLocalArgs())
00848 job.SetLocalEnvironment(pjob.GetLocalEnvironment())
00849 job.SetLocalInputSandbox(pjob.GetLocalInputSandbox())
00850 job.SetLocalOutputSandbox(pjob.GetLocalOutputSandbox())
00851 self._jobManagers[job_name] = job
00852 print str(len(pro_list)) + " ProtoJobs promoted"
00853
00854 def RemoveProtoJobs(self,job_name_pattern = ".*"):
00855
00856 """Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all"""
00857
00858 del_list = []
00859 for job_name,job in self._jobManagers.iteritems():
00860 if job.GetType() != "GBSProtoJob": continue
00861 if re.search(job_name_pattern,job_name): del_list.append(job_name)
00862 for job_name in del_list: del self._jobManagers[job_name]
00863 print str(len(del_list)) + " ProtoJobs deleted"
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874 def _AddJobOrProtoJob(self,job_name,args_str,env_str,type):
00875
00876 """Create a new named Job or ProtoJob and optionally assign its application script local args."""
00877
00878
00879 if re.search(r"[^a-zA-Z0-9_\-]",job_name):
00880 print "Sorry, '" + str(job_name) + "' is an illegal job name (characters other than alphanumeric, '_' and '-')"
00881 return None
00882 if not re.search(r"^job_",job_name): job_name = 'job_' + job_name
00883 if self._jobManagers.has_key(job_name):
00884 print "Sorry, there already is a Job named '" + str(job_name) + "'"
00885 return None
00886
00887
00888 job = None
00889 if ( type == "GBSProtoJob" ): job = GBSProtoJob(job_name)
00890 else: job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self)
00891 if args_str: job.SetScriptLocalArgs(args_str)
00892 if env_str: job.SetLocalEnvironment(env_str)
00893 self._jobManagers[job_name] = job
00894 return job
00895
00896 def __ApplyActionToJobs(self,action,job_name_pattern):
00897
00898 """Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern"""
00899 job_list = []
00900 for job_name,job in self._jobManagers.iteritems():
00901 if not re.search(job_name_pattern,job_name): continue
00902 if ( action == "CLEAR ERROR COUNTS" and job.CanClear() ) \
00903 or ( action == "CLEAR HISTORY" and job.CanClear() ) \
00904 or ( action == "HOLD" and job.IsReady() ) \
00905 or ( action == "KILL" and job.CanKill() ) \
00906 or ( action == "RELEASE" and job.IsHeld() ) \
00907 or ( action == "REMOVE" and job.CanClear() ):
00908 job_list.append(job_name)
00909 num_pjobs = len(job_list)
00910 if not num_pjobs:
00911 print "No jobs suitable for action " + action
00912 return
00913 print str(num_pjobs) + " jobs suitable for action " + action + ":-"
00914 for job_name in job_list: print " " + job_name
00915 ans = raw_input("Do you want to proceed ?y[n]")
00916 if not re.search(r"^y",ans,re.I):
00917 print "Action " + action + " aborted"
00918 return
00919 if action == "CLEAR HISTORY" or action == "REMOVE":
00920 warn = "this wipes all output for the job(s)"
00921 if action == "REMOVE": warn = "this entirely removes the job(s)"
00922 ans = raw_input("Are you REALLY sure - " + warn + "! ?y[n]")
00923 if not re.search(r"^y",ans,re.I):
00924 print "Action " + action + " aborted"
00925 return
00926 for job_name in job_list:
00927 job = self._jobManagers[job_name]
00928 if action == "CLEAR ERROR COUNTS": job.ClearErrorCounts()
00929 if action == "CLEAR HISTORY": job.ClearHistory(False)
00930 if action == "HOLD": job.Hold()
00931 if action == "KILL": job.Kill()
00932 if action == "RELEASE": job.Release()
00933 if action == "REMOVE": job.Remove(False)
00934 print "Action " + action + " applied to " + str(len(job_list)) + " jobs"
00935
00936
00937 def __ReloadChildren(self):
00938
00939 """Reload child Jobs from disk.
00940
00941 Loop over all job_*.state files and recreate.
00942 Make sure any associated Ganga job is in the associated JobTree directory"""
00943
00944 import os, re
00945
00946 child_dir = self.GetStoreLocation("child_dir")
00947 child_state_files = os.listdir(child_dir)
00948 for child_state_file in child_state_files:
00949
00950
00951 child_state_file_spec = child_dir + "/" + child_state_file
00952 if not os.path.isfile(child_state_file_spec): continue
00953 mo = re.search(r"^(job_.*)\.state$",child_state_file)
00954 if not mo: continue
00955 child_name = mo.group(1)
00956 job = self.AddJob(child_name)
00957
00958
00959 try:
00960 g_job = job.GetGangaJob()
00961 Ganga.GPI.jobtree.add(g_job,self.__gangaTreeDir)
00962 except: pass
00963
00964
00965 class GBSProtoJob:
00966
00967 """A potential job made actual through the Task.PromoteProtoJob() method.
00968
00969 Protojobs allow users to create large numbers of jobs
00970 automatically using scripts or data files and review and correct
00971 them before "promoting" them into jobs. As such they are easy
00972 to create and destroy, are not persisted and are consequently lost at
00973 program termination, unless promoted.
00974
00975 GBSProtoJob emulate the following small subset of a real job:-
00976
00977 GetType()
00978 Return object type.
00979
00980 GetScriptLocalArgs()
00981 Return (as a string) the list of application script args that are local to this job
00982
00983 SetScriptLocalArgs(args_str)
00984 Set (as a string) the list of application script args that are local to this job
00985
00986 e.g. job.SetScriptLocalArgs('123 "a string with spaces" 456')
00987
00988 """
00989
00990 def __init__(self,name):
00991 self.__name = name
00992 self.__scriptLocalArgs = ""
00993 self.__localEnvironment = ""
00994 self.__localInputSandbox = ""
00995 self.__localOutputSandbox = ""
00996
00997
00998 def GetType(self): return "GBSProtoJob"
00999
01000 def __repr__(self): return self.AsString()
01001
01002
01003 def AsString(self,level = "Brief"):
01004
01005 """Return string description.
01006
01007 Return string description at the following levels:-
01008 "Brief" one line summary suitable for rows in tables
01009 "Heading" one line summary suitable as heading for "Brief" """
01010
01011 if ( level == "Heading"):
01012 s = "Name".ljust(20)
01013 s += "Status".ljust(23)
01014 s += "Input".ljust(40)
01015 s += "Status Details"
01016 return s
01017 s = self.__name.ljust(20)
01018 s += "(protojob)".ljust(23)
01019 s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40)
01020 return s
01021
01022 def GetLocalEnvironment(self):
01023 """Return, as a comma separated list string, the environment that is local to this job"""
01024 return self.__localEnvironment
01025
01026 def GetScriptLocalArgs(self):
01027 """Return (as a string) the list of application script args that are local to this job"""
01028 return self.__scriptLocalArgs
01029
01030 def GetLocalInputSandbox(self):
01031 """Return, as a comma separated list string, the input sandbox file list that is local to this job."""
01032 return self.__localInputSandbox
01033
01034 def GetLocalOutputSandbox(self):
01035 """Return, as a comma separated list string, the ouput sandbox file list that is local to this job."""
01036 return self.__localOutputSandbox
01037
01038 def SetLocalEnvironment(self,env_str):
01039 """Set, as a comma separated list string, the environment that local to this job.."""
01040 self.__localEnvironment = env_str
01041
01042 def SetScriptLocalArgs(self,arg_str):
01043 """Set (as a string) the comma separated list of application script args that are local to this job."""
01044 self.__scriptLocalArgs = arg_str
01045
01046 def SetLocalInputSandbox(self,in_sbox_str):
01047 """Set, as a comma separated list string, the input sandbox file list that is local to this job"""
01048 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
01049 if ok:
01050
01051 self.__localInputSandbox = in_sbox_str
01052
01053 def SetLocalOutputSandbox(self,out_sbox_str):
01054 """Set, as a comma separated list string, the output sandbox file list that is local to this job."""
01055
01056 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
01057 if ok:
01058 self.__localOutputSandbox = str