Inheritance diagram for python::GBSTask::GBSTask:
Top level object to manage a single task. This object will manages an set of Jobs, each of which looks after a single job, submitting and resubmitting until successful or user intervention is required. ProtoJobs ========= Batches of new jobs are introduced into the system initially as "ProtoJobs". The following methods are used to manipulate them:- AddProtoJob(job_name,args_str="",env_str="") PromoteProtoJobs(job_name_pattern = ".*") RemoveProtoJobs(job_name_pattern = ".*")
Definition at line 15 of file GBSTask.py.
def python::GBSTask::GBSTask::__ApplyActionToJobs | ( | self, | ||
action, | ||||
job_name_pattern | ||||
) | [private] |
Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern
Definition at line 880 of file GBSTask.py.
00882 : job_name = 'job_' + job_name 00883 if self._jobManagers.has_key(job_name): 00884 print "Sorry, there already is a Job named '" + str(job_name) + "'" 00885 return None 00886 00887 # Name O.K., go create the Job. 00888 job = None 00889 if ( type == "GBSProtoJob" ): job = GBSProtoJob(job_name) 00890 else: job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self) 00891 if args_str: job.SetScriptLocalArgs(args_str) 00892 if env_str: job.SetLocalEnvironment(env_str) 00893 self._jobManagers[job_name] = job 00894 return job 00895 00896 def __ApplyActionToJobs(self,action,job_name_pattern): 00897 00898 """Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern""" 00899 job_list = [] # List of job names to which action can be applied 00900 for job_name,job in self._jobManagers.iteritems(): 00901 if not re.search(job_name_pattern,job_name): continue 00902 if ( action == "CLEAR ERROR COUNTS" and job.CanClear() ) \ 00903 or ( action == "CLEAR HISTORY" and job.CanClear() ) \ 00904 or ( action == "HOLD" and job.IsReady() ) \ 00905 or ( action == "KILL" and job.CanKill() ) \ 00906 or ( action == "RELEASE" and job.IsHeld() ) \ 00907 or ( action == "REMOVE" and job.CanClear() ): 00908 job_list.append(job_name) 00909 num_pjobs = len(job_list) 00910 if not num_pjobs: 00911 print "No jobs suitable for action " + action 00912 return 00913 print str(num_pjobs) + " jobs suitable for action " + action + ":-" 00914 for job_name in job_list: print " " + job_name 00915 ans = raw_input("Do you want to proceed ?y[n]") 00916 if not re.search(r"^y",ans,re.I): 00917 print "Action " + action + " aborted" 00918 return 00919 if action == "CLEAR HISTORY" or action == "REMOVE": 00920 warn = "this wipes all output for the job(s)"
def python::GBSObject::GBSObject::__init__ | ( | self, | ||
name, | ||||
parent, | ||||
model | ||||
) | [inherited] |
Definition at line 10 of file GBSObject.py.
00015 : 00016 self.__name = name # Object name, must be unique within scope of parent 00017 self.__parent = parent # Parent (None for Manager) 00018 self.__model = model # Model name 00019 Log(self,logger.SYNOPSIS,"Creating a " + self.GetType() + " named " + self.__name) 00020 00021 # After creation, either read current state from disk, if state file exists, 00022 # or create state file (and if necessary supporting directory) if it doesn't. 00023 00024 if os.path.isfile(self.GetStoreLocation()): 00025 self.Read() else:
def python::GBSTask::GBSTask::__init__ | ( | self, | ||
name, | ||||
parent, | ||||
model, | ||||
model_args | ||||
) |
Definition at line 21 of file GBSTask.py.
00026 :- 00027 00028 AddProtoJob(job_name,args_str="",env_str="") 00029 PromoteProtoJobs(job_name_pattern = ".*") 00030 RemoveProtoJobs(job_name_pattern = ".*") 00031 00032 """ 00033 00034 ###### GBSObject inherited responsibilities ###### 00035 00036 00037 def __init__(self,name,parent,model,model_args): 00038 self.__scriptFileName = "" 00039 self.__scriptGlobalArgs = "" 00040 self.__globalEnvironment = "" 00041 self.__globalInputSandbox = "" 00042 self.__globalOutputSandbox = "" 00043 self.__backend = GetConfigValue("DefaultBackend") 00044 self.__submitEnabled = 0 00045 self.__maxGangaJobs = int(GetConfigValue("DefaultMaxGangaJobs")) 00046 self.__maxSubmitJobs = int(GetConfigValue("DefaultMaxSubmitJobs")) 00047 self.__mode = "Test" 00048 00049 # The remaining members are volatile i.e. not persisted but derived from other state. 00050 # Note that _jobManagers is not private so that MinosRSMTask can rename jobs. 00051 self._jobManagers = {} # Volatile: Hash name -> Job 00052 self.__gangaTreeDir = 'gbs/' + name # Ganga JobTree directory that holds jobs. 00053 00054 self.__jstatAll = 0 # Job stats: All 00055 self.__jstatNReadyAll = 0 # Job stats: Not ready - all 00056 self.__jstatNReadyOther = 0 # Job stats: Not ready - other than placed on hold 00057 self.__jstatReadyAll = 0 # Job stats: Ready to run - all
def python::GBSTask::GBSTask::__ReloadChildren | ( | self | ) | [private] |
Reload child Jobs from disk. Loop over all job_*.state files and recreate. Make sure any associated Ganga job is in the associated JobTree directory
Definition at line 921 of file GBSTask.py.
00921 : warn = "this entirely removes the job(s)" 00922 ans = raw_input("Are you REALLY sure - " + warn + "! ?y[n]") 00923 if not re.search(r"^y",ans,re.I): 00924 print "Action " + action + " aborted" 00925 return 00926 for job_name in job_list: 00927 job = self._jobManagers[job_name] 00928 if action == "CLEAR ERROR COUNTS": job.ClearErrorCounts() 00929 if action == "CLEAR HISTORY": job.ClearHistory(False) 00930 if action == "HOLD": job.Hold() 00931 if action == "KILL": job.Kill() 00932 if action == "RELEASE": job.Release() 00933 if action == "REMOVE": job.Remove(False) 00934 print "Action " + action + " applied to " + str(len(job_list)) + " jobs" 00935 00936 00937 def __ReloadChildren(self): 00938 00939 """Reload child Jobs from disk. 00940 00941 Loop over all job_*.state files and recreate. 00942 Make sure any associated Ganga job is in the associated JobTree directory""" 00943 00944 import os, re 00945 00946 child_dir = self.GetStoreLocation("child_dir") 00947 child_state_files = os.listdir(child_dir) 00948 for child_state_file in child_state_files:
def python::GBSTask::GBSTask::__repr__ | ( | self | ) |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 77 of file GBSTask.py.
00087 : self.Write()
def python::GBSTask::GBSTask::_AddJobOrProtoJob | ( | self, | ||
job_name, | ||||
args_str, | ||||
env_str, | ||||
type | ||||
) | [private] |
Create a new named Job or ProtoJob and optionally assign its application script local args.
Definition at line 858 of file GBSTask.py.
00859 : 00860 if job.GetType() != "GBSProtoJob": continue 00861 if re.search(job_name_pattern,job_name): del_list.append(job_name) 00862 for job_name in del_list: del self._jobManagers[job_name] 00863 print str(len(del_list)) + " ProtoJobs deleted" 00864 00865 00866 00867 #----------------------------------------------------------------------------------------------- 00868 # 00869 # Private Methods (not user callable) 00870 # 00871 #----------------------------------------------------------------------------------------------- 00872 00873 00874 def _AddJobOrProtoJob(self,job_name,args_str,env_str,type): 00875 00876 """Create a new named Job or ProtoJob and optionally assign its application script local args.""" 00877 00878 # Check that the name is legal and not already in use. 00879 if re.search(r"[^a-zA-Z0-9_\-]",job_name): print "Sorry, '" + str(job_name) + "' is an illegal job name (characters other than alphanumeric, '_' and '-')"
def python::GBSTask::GBSTask::_DoMemberIO | ( | self, | ||
ioh | ||||
) | [private] |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 58 of file GBSTask.py.
00058 : Ready to run - retries 00059 self.__jstatSubmitAll = 0 # Job stats: Submitted to Ganga - all 00060 self.__jstatSubmitNRun = 0 # Job stats: Submitted to Ganga - but not running 00061 self.__jstatDoneAll = 0 # Job stats: Done - all 00062 self.__jstatDoneFail = 0 # Job stats: Done - Failed 00063 00064 GBSObject.__init__(self,name,parent,model) 00065 00066 # Create Ganga JobTree directory if necessary 00067 if not Ganga.GPI.jobtree.exists(self.__gangaTreeDir): Ganga.GPI.jobtree.mkdir(self.__gangaTreeDir) 00068 00069 self.MakeChildDirectory() 00070 self.__ReloadChildren()
def python::GBSTask::GBSTask::AddJob | ( | self, | ||
job_name, | ||||
args_str = "" , |
||||
env_str = "" | ||||
) |
Create a new named Job and optionally assign its application script local args and environment. e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no") Note that the job_name must begin job_ (if omitted it will be added automatically) and will be rejected if it contains anything beyond alphanumeric, '_' and '-'.for as it will be used to name a directory.
Definition at line 259 of file GBSTask.py.
00259 : continue 00260 job = self._jobManagers[job_name] 00261 if first: 00262 print job.AsString("Heading") + "\n" 00263 first = 0 00264 print job.AsString("Brief") 00265 if first: print " no jobs found" 00266 00267 00268 #----------------------------------------------------------------------------------------------- 00269 # 00270 # User Callable Methods Setters
def python::GBSTask::GBSTask::AddProtoJob | ( | self, | ||
job_name, | ||||
args_str = "" , |
||||
env_str = "" | ||||
) |
Add a ProtJob with the name job_name and assign its local args the string args_str and environment env_str.
Definition at line 800 of file GBSTask.py.
00801 : f_task.write(row) 00802 f_task.write("</table>\n") 00803 f_task.write("</body></html>\n") 00804 f_task.close() 00805 print "HTML Status report written to " + task_file 00806 00807 00808 00809 #----------------------------------------------------------------------------------------------- 00810 # 00811 # ProtoJob Methods (user callable)
def python::GBSTask::GBSTask::AsString | ( | self, | ||
level = "Full" | ||||
) |
Return string description. Return string description at the following levels:- "Brief" one line summary suitable for rows in tables "Heading" one line summary suitable as heading for "Brief" "Full" full desciption including value of every data member
Definition at line 87 of file GBSTask.py.
00087 : 00088 self.Write() 00089 for job_name,job in self._jobManagers.iteritems():job.WriteFamily() 00090 00091 def GetType(self): return "GBSTask" 00092 00093 def __repr__(self) : return self.AsString() 00094 00095 00096 #----------------------------------------------------------------------------------------------- 00097 # 00098 # User Callable Methods Getters 00099 # 00100 #----------------------------------------------------------------------------------------------- 00101 00102 00103 def AsString(self,level = "Full"): 00104 00105 """Return string description. 00106 00107 Return string description at the following levels:- 00108 "Brief" one line summary suitable for rows in tables 00109 "Heading" one line summary suitable as heading for "Brief" 00110 "Full" full desciption including value of every data member""" 00111 00112 if ( level == "Heading"): 00113 s = "Name".ljust(20) 00114 s += "Model".ljust(15) 00115 s += "!Ready(other) Ready(retry) Sub(!run) Done(fail)".rjust(51) 00116 s += "Backend".rjust(10) + " " 00117 s += "Script file".ljust(40) 00118 return s 00119 if ( level == "Brief"): 00120 s = self.GetName().ljust(20) 00121 s += self.GetModel().ljust(15) 00122 s += str(self.__jstatNReadyAll).rjust(8) + "("+ str(self.__jstatNReadyOther).rjust(3) + ")" 00123 s += str(self.__jstatReadyAll).rjust(9) + "("+ str(self.__jstatReadyRetry).rjust(3) + ")" 00124 s += str(self.__jstatSubmitAll).rjust(8) + "("+ str(self.__jstatSubmitNRun).rjust(3) + ")" 00125 s += str(self.__jstatDoneAll).rjust(8) + "("+ str(self.__jstatDoneFail).rjust(3) + ")" 00126 backend = self.__backend 00127 mo = re.search(r"(.*?):",backend) 00128 if mo: backend = mo.group(1) 00129 s += backend.rjust(10) + " " 00130 s += self.__scriptFileName.ljust(40) 00131 return s 00132 00133 s = GBSObject.__repr__(self) + "\n\n" 00134 s += "Model: " + self.GetModel() + "\n\n" 00135 s += "Job Definition\n" 00136 s += " Script file: " + self.__scriptFileName + "\n" 00137 s += " Global args: '" + self.__scriptGlobalArgs + "'\n" 00138 s += " Global env: '" + self.__globalEnvironment + "'\n" 00139 s += " Input Sandbox: '" + self.__globalInputSandbox + "'\n" s += " Output Sandbox: '" + self.__globalOutputSandbox + "'\n\n"
def python::GBSTask::GBSTask::ClearErrorCountsJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, clear error counts on all suitable jobs matching pattern
Definition at line 271 of file GBSTask.py.
00275 :
def python::GBSTask::GBSTask::ClearHistoryJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, clear histories on all suitable jobs matching pattern
Definition at line 275 of file GBSTask.py.
00275 : 00276 00277 """Create a new named Job and optionally assign its application script local args and environment. 00278 e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no")
def python::GBSTask::GBSTask::EnableSubmit | ( | self, | ||
enable = True | ||||
) |
Enable or disable submission of jobs via the SubmitJobs() method. The function returns the previous value of the switch.
Definition at line 279 of file GBSTask.py.
00287 : 00288 """After seeking confirmation, clear error counts on all suitable jobs matching pattern""" self.__ApplyActionToJobs("CLEAR ERROR COUNTS",job_name_pattern)
def python::GBSTask::GBSTask::GetBackend | ( | self | ) |
Return backend for future jobs
Definition at line 140 of file GBSTask.py.
00140 : '" + self.__globalOutputSandbox + "'\n\n" 00141 s += "Job Submission\n" 00142 s += " " 00143 if self.__submitEnabled: s += "Enabled\n"
def python::GBSTask::GBSTask::GetGangaTreeDir | ( | self | ) |
Return Ganga JobTree Directory that holds all Ganga jobs
Definition at line 144 of file GBSTask.py.
00144 : s += "Disabled\n" 00145 s += " Backend: " + self.__backend + "\n" 00146 s += " Limits: " + str(self.__maxSubmitJobs) + "(single submit) " + str(self.__maxGangaJobs) + "(maximum)\n" 00147 s += " Mode: " + self.__mode + "\n\n"
def python::GBSTask::GBSTask::GetGlobalEnvironment | ( | self, | ||
prettyPrint = False | ||||
) |
Return, as a comma separated list string, the environment that is global to all jobs If prettyPrint is True print out list one item per line
Definition at line 148 of file GBSTask.py.
00149 : " + str(self.__jstatNReadyAll).rjust(5) + " ("+ str(self.__jstatNReadyOther) + " other)\n" 00150 s += " Waiting: " + str(self.__jstatReadyAll).rjust(5) + " ("+ str(self.__jstatReadyRetry) + " retries)\n" 00151 s += " Submitted: " + str(self.__jstatSubmitAll).rjust(5) + " ("+ str(self.__jstatSubmitNRun) + " not running)\n" 00152 s += " Done: " + str(self.__jstatDoneAll).rjust(5) + " ("+ str(self.__jstatDoneFail) + " failed)\n" 00153 return s 00154 00155
def python::GBSTask::GBSTask::GetGlobalInputSandbox | ( | self | ) |
Return, as a comma separated list string, the input sandbox file list that is global to all jobs
Definition at line 156 of file GBSTask.py.
00156 : 00157 """Return backend for future jobs""" 00158 return self.__backend 00159 def GetGangaTreeDir(self):
def python::GBSTask::GBSTask::GetGlobalOutputSandbox | ( | self | ) |
Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs
Definition at line 160 of file GBSTask.py.
00160 : 00161 """Return Ganga JobTree Directory that holds all Ganga jobs""" 00162 return self.__gangaTreeDir 00163 def GetGlobalEnvironment(self,prettyPrint = False):
def python::GBSTask::GBSTask::GetJob | ( | self, | ||
name, | ||||
warn = True | ||||
) |
Return Job called job_name e.g. job = task.GetJob("job_00123")
Definition at line 184 of file GBSTask.py.
00184 : 00185 """Return the maximum number of jobs that can be submitted by a single call SubmitJobs() """ 00186 return self.__maxSubmitJobs 00187 00188 def GetMode(self): 00189 """Return the Test/Production Mode """ 00190 return self.__mode
def python::GBSTask::GBSTask::GetJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
Return a dictionary: dict[name] job for all jobs matching pattern
Definition at line 191 of file GBSTask.py.
00192 : 00193 """Returns the current user application script file name (or "" if none). """ 00194 return self.__scriptFileName 00195 00196 def GetScriptFileSpec(self): 00197 if not self.__scriptFileName : return "" 00198 return self.GetStoreLocation("child_dir") + "/" + self.__scriptFileName
def python::GBSTask::GBSTask::GetMaxGangaJobs | ( | self | ) |
Return the maximum number of jobs that can be submitted to Ganga at any one time
Definition at line 164 of file GBSTask.py.
00164 : 00165 """Return, as a comma separated list string, the environment that is global to all jobs 00166 00167 If prettyPrint is True print out list one item per line""" if not prettyPrint: return self.__globalEnvironment
def python::GBSTask::GBSTask::GetMaxSubmitJobs | ( | self | ) |
Return the maximum number of jobs that can be submitted by a single call SubmitJobs()
Definition at line 168 of file GBSTask.py.
00168 : return self.__globalEnvironment 00169 print "Global Environment:-" 00170 GBSUtilities.ListCommaSepList(self.__globalEnvironment," ") 00171
def python::GBSTask::GBSTask::GetMode | ( | self | ) |
Return the Test/Production Mode
Definition at line 172 of file GBSTask.py.
00172 : 00173 """Return, as a comma separated list string, the input sandbox file list that is global to all jobs""" 00174 return self.__globalInputSandbox 00175 def GetGlobalOutputSandbox(self):
def python::GBSObject::GBSObject::GetModel | ( | self | ) | [inherited] |
def python::GBSObject::GBSObject::GetName | ( | self | ) | [inherited] |
def python::GBSObject::GBSObject::GetParent | ( | self | ) | [inherited] |
def python::GBSTask::GBSTask::GetScriptFileName | ( | self | ) |
Returns the current user application script file name (or "" if none).
Definition at line 176 of file GBSTask.py.
00176 : 00177 """Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs""" 00178 return self.__globalOutputSandbox 00179 def GetMaxGangaJobs(self):
def python::GBSTask::GBSTask::GetScriptFileSpec | ( | self | ) |
Definition at line 180 of file GBSTask.py.
00180 : 00181 """Return the maximum number of jobs that can be submitted to Ganga at any one time """ 00182 return self.__maxGangaJobs 00183 def GetMaxSubmitJobs(self):
def python::GBSTask::GBSTask::GetScriptGlobalArgs | ( | self | ) |
Returns (as a string) the comma separated list of application script args that are global to all jobs
Definition at line 199 of file GBSTask.py.
00200 : 00201 """Return Job called job_name e.g. job = task.GetJob("job_00123")""" 00202 if not self._jobManagers.has_key(name): if warn: print "Sorry, there is no Job named " + str(name)
def python::GBSObject::GBSObject::GetStoreLocation | ( | self, | ||
type = "self" | ||||
) | [inherited] |
Return storage location: parent directory, self or child directory. type is one of "parent_dir" parent's child directory "self" object's state file "child_dir" directory for own children
Definition at line 38 of file GBSObject.py.
00039 : return self.__name 00040 def GetModel(self): return self.__model 00041 def GetParent(self): return self.__parent 00042 00043 def GetStoreLocation(self,type="self"): 00044 00045 """Return storage location: parent directory, self or child directory. 00046 00047 type is one of "parent_dir" parent's child directory 00048 "self" object's state file 00049 "child_dir" directory for own children""" 00050 00051 loc = "" 00052 if self.__parent == None : loc = GBSConfig.GetConfig().GetValue("DataDirectory") 00053 else : loc = self.__parent.GetStoreLocation("parent_dir") + "/" + self.__parent.GetName() 00054 if type != "parent_dir": loc += "/" + self.GetName()
def python::GBSTask::GBSTask::GetTestOnlyMethods | ( | self | ) |
Return a list of methods that are only available in 'Test' mode.
Definition at line 203 of file GBSTask.py.
00203 : print "Sorry, there is no Job named " + str(name) 00204 return None 00205 else : return self._jobManagers[name] 00206 00207 def GetJobs(self,job_name_pattern = ".*"): 00208 """Return a dictionary: dict[name] job for all jobs matching pattern""" 00209 00210 job_dict = {}
def python::GBSTask::GBSTask::GetType | ( | self | ) |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 75 of file GBSTask.py.
00087 : self.Write()
def python::GBSTask::GBSTask::HoldJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, hold all suitable jobs matching pattern
Definition at line 289 of file GBSTask.py.
00291 : 00292 """After seeking confirmation, clear histories on all suitable jobs matching pattern""" self.__ApplyActionToJobs("CLEAR HISTORY",job_name_pattern)
def python::GBSTask::GBSTask::IsAuthorisedToSubmit | ( | self, | ||
warn = True | ||||
) |
Returns True is authorised to submit jobs. Always True except if backend is LCG, then checks proxy
Definition at line 211 of file GBSTask.py.
00211 : 00212 if re.search(job_name_pattern,job_name): job_dict[job_name] = job 00213 return job_dict 00214 00215 def GetScriptGlobalArgs(self): 00216 """Returns (as a string) the comma separated list of application script args that are global to all jobs """ 00217 return self.__scriptGlobalArgs 00218 00219 def GetTestOnlyMethods(self): 00220 """Return a list of methods that are only available in 'Test' mode.""" 00221 return ["SetGlobalEnvironment", \ "SetGlobalInputSandbox", \
def python::GBSTask::GBSTask::IsDisabled | ( | self, | ||
method | ||||
) |
Return True if method is disabled.
Definition at line 222 of file GBSTask.py.
00227 : 00228 """Returns True is authorised to submit jobs. 00229 00230 Always True except if backend is LCG, then checks proxy""" 00231 if self.GetBackend()[0:3] != "LCG" : return True ok = False
def python::GBSTask::GBSTask::KillJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, kill all suitable jobs matching pattern
Definition at line 293 of file GBSTask.py.
00295 : 00296 """Enable or disable submission of jobs via the SubmitJobs() method. The function returns the previous value of the switch."""
def python::GBSTask::GBSTask::ListJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
List existing Jobs matching pattern
Definition at line 232 of file GBSTask.py.
00233 : ok = Ganga.GPI.gridProxy.isValid() 00234 except: ok = False 00235 if warn and not ok: print "Cannot submit jobs; backend is LCG but cannot find valid proxy" 00236 return ok 00237 00238 def IsDisabled(self,method): 00239 """Return True if method is disabled.""" 00240 00241 if self.GetMode() == 'Test': return False 00242 for disabled_method in self.GetTestOnlyMethods(): 00243 if method == disabled_method: 00244 print "Method " + method + " is disabled in Production mode" 00245 return True 00246 return False 00247 00248 def ListJobs(self,job_name_pattern = ".*"): 00249 00250 """List existing Jobs matching pattern""" 00251 00252 print self.GetName() + " has the following jobs", 00253 if job_name_pattern != ".*": print "that match the pattern: '" + str(job_name_pattern) + "'", 00254 print ":-" 00255 first = 1 00256 job_names = self._jobManagers.keys() 00257 job_names.sort() 00258 for job_name in job_names:
def python::GBSObject::GBSObject::MakeChildDirectory | ( | self | ) | [inherited] |
If directory used to store child objects does not exist, create it
Definition at line 55 of file GBSObject.py.
00055 : loc += ".state" 00056 return loc 00057 00058 # I/O 00059 00060 def MakeChildDirectory(self): 00061 00062 """If directory used to store child objects does not exist, create it""" 00063 00064 child_dir = self.GetStoreLocation("child_dir")
def python::GBSTask::GBSTask::PromoteProtoJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all.
Definition at line 812 of file GBSTask.py.
00816 : 00817 00818 """Add a ProtJob with the name job_name and assign its local args the string args_str 00819 and environment env_str.""" 00820 00821 if env_str: 00822 d = GBSUtilities.ParseEnvStr(env_str) 00823 if not d: 00824 print "Cannot parse environment string: " + env_str 00825 return None 00826 return self._AddJobOrProtoJob(job_name,args_str,env_str,"GBSProtoJob") 00827 00828 def PromoteProtoJobs(self,job_name_pattern = ".*"): 00829 00830 """Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all.""" 00831 00832 pro_list = [] # List of names to be promoted (give user a last chance to abort) 00833 for job_name,job in self._jobManagers.iteritems(): 00834 if job.GetType() != "GBSProtoJob": continue 00835 if re.search(job_name_pattern,job_name): pro_list.append(job_name) 00836 num_pjobs = len(pro_list) 00837 if not num_pjobs: print "No ProtoJobs to promote"
def python::GBSObject::GBSObject::Read | ( | self | ) | [inherited] |
Definition at line 65 of file GBSObject.py.
00065 : Log(self,logger.SYNOPSIS,"Creating directory for child objects:" + str(child_dir))
def python::GBSTask::GBSTask::RefreshJobStats | ( | self | ) |
Bring job statistics up to date. Should not be necessary (but harmless) for user to call this as child jobs call it when their state changes. Note: unlike UpdateJobsStatus this simply asks jobs what their current state without checking Ganga
Definition at line 297 of file GBSTask.py.
00301 : self.__submitEnabled = 1 00302 self.Write() 00303 return old_val 00304 00305 def HoldJobs(self,job_name_pattern= ".*"): 00306 """After seeking confirmation, hold all suitable jobs matching pattern""" 00307 self.__ApplyActionToJobs("HOLD",job_name_pattern) 00308 00309 def KillJobs(self,job_name_pattern= ".*"): 00310 """After seeking confirmation, kill all suitable jobs matching pattern""" 00311 self.__ApplyActionToJobs("KILL",job_name_pattern) 00312 00313 def RefreshJobStats(self): 00314 00315 """ Bring job statistics up to date. 00316 00317 Should not be necessary (but harmless) for user to call this 00318 as child jobs call it when their state changes. Note: unlike 00319 UpdateJobsStatus this simply asks jobs what their current 00320 state without checking Ganga """ 00321 self.__jstatAll = 0 00322 self.__jstatNReadyAll = 0 00323 self.__jstatNReadyOther = 0 00324 self.__jstatReadyAll = 0 00325 self.__jstatReadyRetry = 0
def python::GBSTask::GBSTask::ReleaseJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, release all suitable jobs matching pattern
Definition at line 326 of file GBSTask.py.
00330 : self.__jstatAll += 1
def python::GBSTask::GBSTask::RemoveJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
After seeking confirmation, remove all suitable jobs matching pattern
Definition at line 330 of file GBSTask.py.
00330 : 00331 self.__jstatAll += 1 00332 pc = job.GetPhaseCode() 00333 if pc == GID_JPC_DONE_FAIL or pc == GID_JPC_DONE_NFAIL: self.__jstatDoneAll += 1 if pc == GID_JPC_DONE_FAIL: self.__jstatDoneFail += 1
def python::GBSTask::GBSTask::RemoveProtoJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
) |
Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all
Definition at line 838 of file GBSTask.py.
00841 : 00842 print "Promotion aborted" 00843 return 00844 for job_name in pro_list: 00845 pjob = self._jobManagers[job_name] 00846 job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self) 00847 job.SetScriptLocalArgs(pjob.GetScriptLocalArgs()) 00848 job.SetLocalEnvironment(pjob.GetLocalEnvironment()) 00849 job.SetLocalInputSandbox(pjob.GetLocalInputSandbox()) 00850 job.SetLocalOutputSandbox(pjob.GetLocalOutputSandbox()) 00851 self._jobManagers[job_name] = job 00852 print str(len(pro_list)) + " ProtoJobs promoted" 00853 00854 def RemoveProtoJobs(self,job_name_pattern = ".*"): 00855 00856 """Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all""" 00857 del_list = [] # List of names to be deleted (cannot delete elements while iterating)
def python::GBSObject::GBSObject::Rename | ( | self, | ||
new_name | ||||
) | [inherited] |
Rename to new_name (does not perform I/O - that must be done by caller).
Definition at line 82 of file GBSObject.py.
00082 : 00083 self.__model = ioh("Model Name","s",self.__model) 00084 self.__name = ioh("Object Name","s",self.__name) 00085 00086 00087 def Rename(self,new_name): 00088 """Rename to new_name (does not perform I/O - that must be done by caller).""" self.__name = new_name
def python::GBSTask::GBSTask::SetBackend | ( | self, | ||
backend | ||||
) |
Define backend for future jobs: One of: Local, PBS:queue, LCG:queue
Definition at line 334 of file GBSTask.py.
00334 : self.__jstatDoneFail += 1 00335 if pc == GID_JPC_READY_RETRY or pc == GID_JPC_READY_NRETRY: self.__jstatReadyAll += 1 00336 if pc == GID_JPC_READY_RETRY: self.__jstatReadyRetry += 1 00337 if pc == GID_JPC_SUBMIT_RUN or pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitAll += 1 00338 if pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitNRun += 1 00339 if pc == GID_JPC_NREADY_HOLD or pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyAll += 1 00340 if pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyOther += 1 00341 00342 def ReleaseJobs(self,job_name_pattern = ".*"): 00343 """After seeking confirmation, release all suitable jobs matching pattern""" 00344 self.__ApplyActionToJobs("RELEASE",job_name_pattern) 00345
def python::GBSTask::GBSTask::SetGlobalEnvironment | ( | self, | ||
env_str | ||||
) |
Set, as a comma separated list string, the environment that is global to all jobs. e.g. task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456') If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it
Definition at line 346 of file GBSTask.py.
00346 : 00347 """After seeking confirmation, remove all suitable jobs matching pattern""" 00348 self.__ApplyActionToJobs("REMOVE",job_name_pattern) 00349 00350 def SetBackend(self,backend): 00351 00352 """Define backend for future jobs: One of: Local, PBS:queue, LCG:queue""" 00353 00354 # Check value before accepting 00355 if not re.search(r"^(Local|PBS|LCG)(:.+)?$",backend): 00356 print "Sorry, '" + str(backend) + "' is not an acceptible backend. It should be one of:-\n"\ 00357 + " Local, PBS{:queue} or LCG{:queue}" 00358 return 00359 self.__backend = backend 00360 self.Write() 00361 00362 def SetGlobalEnvironment(self,env_str): 00363 00364 """Set, as a comma separated list string, the environment that is global to all jobs. 00365 00366 e.g. task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456') 00367 00368 If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it"""
def python::GBSTask::GBSTask::SetGlobalInputSandbox | ( | self, | ||
in_sbox_str | ||||
) |
Set, as a comma separated list string, the input sandbox file list that is global to all jobs. e.g. task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')
Definition at line 369 of file GBSTask.py.
00370 : return 00371 if re.search(r"^\+",env_str): 00372 s = self.__globalEnvironment 00373 if s: s += ',' 00374 s += env_str[1:] 00375 # Force local method to be recalled rather than any inherited one 00376 # May just be paranoia but see corresponding GBSJob method. 00377 GBSTask.SetGlobalEnvironment(self,s) 00378 return 00379 d = GBSUtilities.ParseEnvStr(env_str) 00380 if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
def python::GBSTask::GBSTask::SetGlobalOutputSandbox | ( | self, | ||
out_sbox_str | ||||
) |
Set, as a comma separated list string, the output sandbox file list that is global to all jobs. e.g. task.SetGlobalOutputSandbox('my_output_data.dat,my_output.log')
Definition at line 381 of file GBSTask.py.
00381 : 00382 self.__globalEnvironment = GBSUtilities.BuildEnvStr(d) 00383 self.Write() 00384 00385 def SetGlobalInputSandbox(self,in_sbox_str): 00386 00387 """Set, as a comma separated list string, the input sandbox file list that is global to all jobs. 00388 00389 e.g. task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')""" 00390 00391 if self.IsDisabled("SetGlobalInputSandbox"): return 00392 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input") if ok:
def python::GBSTask::GBSTask::SetMaxGangaJobs | ( | self, | ||
n | ||||
) |
Set the maximum number of jobs that can be submitted to Ganga at any one time
Definition at line 393 of file GBSTask.py.
00393 : 00394 self.__globalInputSandbox = str 00395 self.Write() 00396 00397 def SetGlobalOutputSandbox(self,out_sbox_str): 00398 """Set, as a comma separated list string, the output sandbox file list that is global to all jobs.
def python::GBSTask::GBSTask::SetMaxSubmitJobs | ( | self, | ||
n | ||||
) |
Set the maximum number of jobs that can be submitted by a single call SubmitJobs()
Definition at line 399 of file GBSTask.py.
def python::GBSTask::GBSTask::SetMode | ( | self, | ||
mode | ||||
) |
Set the Test/Production Mode
Definition at line 405 of file GBSTask.py.
00405 : 00406 self.__globalOutputSandbox = str 00407 self.Write() 00408 00409 def SetMaxGangaJobs(self,n): 00410 """Set the maximum number of jobs that can be submitted to Ganga at any one time""" 00411 00412 self.__maxGangaJobs = n 00413 self.Write() 00414 00415 def SetMaxSubmitJobs(self,n): 00416 """Set the maximum number of jobs that can be submitted by a single call SubmitJobs()""" 00417 00418 self.__maxSubmitJobs = n 00419 self.Write() 00420 00421 def SetMode(self,mode): 00422 """Set the Test/Production Mode """ 00423 if mode != "Test" and mode != "Production": 00424 print "Illegal mode: '" + mode + "', only 'Test' and 'Production' permitted" 00425 return 00426 if self.__mode == mode: return 00427 if mode == 'Production': 00428 print "Caution: Switching to 'Production' mode will disable the following methods:-\n" 00429 print str(self.GetTestOnlyMethods()) print "\n but should be done before moving into full production."
def python::GBSTask::GBSTask::SetScriptFileName | ( | self, | ||
ext_file_spec | ||||
) |
Pass in script file to be used by jobs. GBS make a local copy so no need to keep once passed. e.g. task.SetScriptFileName("my/directory/my_script_file.sh")
Definition at line 430 of file GBSTask.py.
00431 : 00432 if not self.GetScriptFileName(): 00433 print "You cannot switch to 'Production' mode; no global script file has been defined." 00434 return 00435 print "Caution: Switching back to 'Test' mode is potentially dangerous as it re-enables the following methods:-\n" 00436 print str(self.GetTestOnlyMethods()) 00437 print "\n and using these methods could produce results that are not homogeneous" 00438 ans = raw_input("Do you want to proceed ?y[n]") 00439 if not re.search(r"^y",ans,re.I): 00440 print "Mode not changed." 00441 return 00442 print "Mode changed to " + mode 00443 self.__mode = mode 00444 self.Write() 00445 00446 def SetScriptFileName(self,ext_file_spec): 00447 00448 """Pass in script file to be used by jobs. GBS make a local copy 00449 so no need to keep once passed.
def python::GBSTask::GBSTask::SetScriptGlobalArgs | ( | self, | ||
arg_str | ||||
) |
Set (as a string) the comma separated list of application script args that are global to all jobs.
Definition at line 450 of file GBSTask.py.
00453 : return 00454 import os 00455 if not os.path.isfile(ext_file_spec): 00456 print "Cannot find script file " + str(ext_file_spec) 00457 return
def python::GBSTask::GBSTask::SubmitJobs | ( | self, | ||
num_req = 0 | ||||
) |
If enabled (see EnableSubmit()), submit up to the limit (see GetMaxSubmitJobs()) jobs, but not to exceed the maximum (see GetMaxGangaJobs()) first choosing ones scheduled for retry and then new jobs. Function returns number submitted.
Definition at line 458 of file GBSTask.py.
00461 : 00462 print "Failed to copy file!" 00463 self.__scriptFileName = "" 00464 self.Write() 00465 00466 def SetScriptGlobalArgs(self,arg_str): 00467 00468 """Set (as a string) the comma separated list of application script args that are global to all jobs.""" 00469 00470 if self.IsDisabled("SetScriptGlobalArgs"): return 00471 self.__scriptGlobalArgs = arg_str 00472 self.Write() 00473 00474 def SubmitJobs(self,num_req = 0): 00475 00476 """If enabled (see EnableSubmit()), 00477 submit up to the limit (see GetMaxSubmitJobs()) jobs, 00478 but not to exceed the maximum (see GetMaxGangaJobs()) 00479 first choosing ones scheduled for retry and 00480 then new jobs. Function returns number submitted.""" 00481 00482 num_sub = 0 00483 00484 # Begin by making sure that submit is authorised, enabled and that there is a global script. 00485 if not self.IsAuthorisedToSubmit(): return num_sub 00486 if not self.__submitEnabled: 00487 print "Sorry, cannot submit; submit not enabled." 00488 return num_sub 00489 if not self.__scriptFileName: 00490 print "Sorry, cannot submit; no user application script assigned." 00491 return num_sub 00492 00493 # Make sure all jobs are up to date and that limits are not exceeded. 00494 self.UpdateJobsStatus() 00495 if self.__maxGangaJobs <= self.__jstatSubmitAll: 00496 print "Sorry, cannot submit; already have %d (limit %d)" % (self.__jstatSubmitAll,self.__maxGangaJobs) 00497 return num_sub if num_req <= 0: num_req = self.__maxSubmitJobs
def python::GBSTask::GBSTask::UpdateJobsStatus | ( | self | ) |
Send all jobs their UpdateStatus(). May involve checking Ganga job and retrieving output. As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder but not owned by any Job
Definition at line 498 of file GBSTask.py.
00498 : num_req = self.__maxSubmitJobs 00499 max_job = min(num_req,self.__maxGangaJobs - self.__jstatSubmitAll) 00500 if max_job < num_req: print "Sorry, can only submit %d, close to the limit of %d" % (max_job,self.__maxGangaJobs) 00501 00502 # First look for retries and then look for new jobs. 00503 for jsc in [GID_JSC_RETRY,GID_JSC_NEW]: 00504 job_names = self._jobManagers.keys() 00505 job_names.sort() 00506 for job_name in job_names: 00507 job = self._jobManagers[job_name] 00508 if job.GetStatusCode() != jsc: continue 00509 job.Submit() 00510 num_sub += 1 00511 if num_sub >= max_job: return num_sub 00512 return num_sub 00513 00514 def UpdateJobsStatus(self): 00515 00516 """Send all jobs their UpdateStatus(). May involve checking Ganga job and retrieving output. 00517 00518 As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder 00519 but not owned by any Job""" 00520 00521 # If job polling not enabled, perform a single sweep with a timeout based on the number 00522 # of Ganga jobs 00523 if not Ganga.GPI.config['PollThread']['autostart']:
def python::GBSTask::GBSTask::WarnLowGridProxy | ( | self, | ||
email_list, | ||||
proxy_min_hours = 3. , |
||||
myproxy_min_days = 3. | ||||
) |
If there is a valid GRID proxy, but it is running low:- 1) 'proxy_min_hours' defined and Proxy lifetime is less than 'proxy_min_hours' or 2) 'myproxy_min_days defined and myproxy lifetime is less that 'myproxy_min_days'
Definition at line 524 of file GBSTask.py.
00527 {} 00528 for job_name,job in self._jobManagers.iteritems(): 00529 job.UpdateStatus() 00530 owned_ganga_ids[job.GetGangaJobId()] = 1 00531 # Clean up dead links. This ought not to be necessary but, at least as late as 4.4.8 it is. 00532 Ganga.GPI.jobtree.cleanlinks() 00533 for gj in Ganga.GPI.jobtree.getjobs(self.__gangaTreeDir): 00534 if owned_ganga_ids.has_key(gj.id): continue 00535 Log(self,logger.WARNING,"Removing orphaned Ganga job %d" % gj.id) 00536 gj.remove() 00537 00538 #----------------------------------------------------------------------------------------------- 00539 00540 def WarnLowGridProxy(self,email_list,proxy_min_hours=3.,myproxy_min_days=3.): 00541 """If there is a valid GRID proxy, but it is running low:- 00542 00543 1) 'proxy_min_hours' defined and Proxy lifetime is less than 'proxy_min_hours' 00544 or 2) 'myproxy_min_days defined and myproxy lifetime is less that 'myproxy_min_days' 00545 """ 00546 # No action if there isn't an email list or a GRID Proxy 00547 if not email_list or os.system("voms-proxy-info --exists"): return 00548 00549 proxy_warn = "" 00550 00551 # Test GRID proxy 00552 if proxy_min_hours: 00553 inp = os.popen("voms-proxy-info --timeleft","r") 00554 line = inp.readline() 00555 inp.close() 00556 try: 00557 hours_left = int(line)/(60.*60.) 00558 if hours_left < proxy_min_hours: proxy_warn = "GRID proxy remaining lifetime = %.1f hours " % hours_left 00559 except: pass 00560
def python::GBSObject::GBSObject::Write | ( | self | ) | [inherited] |
def python::GBSTask::GBSTask::WriteFamily | ( | self | ) |
def python::GBSTask::GBSTask::WriteHtmlReport | ( | self, | ||
task_dir | ||||
) |
Write HTML status report <task_dir>/<task-name>.html. Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html.
Definition at line 561 of file GBSTask.py.
00562 : 00563 inp = os.popen("myproxy-info | grep timeleft","r") 00564 line = inp.readline() 00565 inp.close() 00566 mo = re.search(r"\s+(\d+):(\d+):(\d+)",line) 00567 if mo: 00568 days_left = ( (int(mo.group(3))/60.+int(mo.group(2)))/60. + int(mo.group(1)) )/24. 00569 if days_left < myproxy_min_days: proxy_warn += "myproxy remaining lifetime = %.1f days" % days_left 00570 00571 if proxy_warn: 00572 cmd = 'mail -s"Task:%s WARNING: %s" %s<< EOD\n' % (self.GetName(),proxy_warn,email_list) 00573 cmd += "\nPlease log in and renew the proxy/myproxy as appropriate.\nEOD\n" 00574 os.system(cmd) 00575 00576 00577 def WriteHtmlReport(self,task_dir): 00578 """Write HTML status report <task_dir>/<task-name>.html. 00579 00580 Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html.""" 00581 00582 if not os.path.isdir(task_dir): 00583 print "Unable to find directory: " + str(task_dir) 00584 return 00585 task_name = self.GetName() 00586 00587 # Create or wipe directory for job reports if necessary 00588 job_dir = task_dir + "/" + task_name 00589 if not os.path.isdir(job_dir): 00590 try: 00591 os.mkdir(job_dir,0755) 00592 except OSError: 00593 print "Unable to create directory: " + str(job_dir) 00594 return 00595 else: 00596 os.system("cd " + job_dir + "; rm -f *") 00597 00598 # Make sure that job statistics are right up to date 00599 self.RefreshJobStats() 00600 00601 # Create file for task report and record task data 00602 task_file = task_dir + "/" + task_name + '.html' 00603 try: 00604 f_task = open(task_file, 'w') 00605 except OSError: 00606 print "Unable to create file: " + str(task_file) 00607 return 00608 f_task.write("<html><body>\n") 00609 f_task.write("<h1>Status report for Task " + task_name + " produced on " + GBSTimeStamp() + "</h1>\n<pre>\n") 00610 f_task.write(str(self)) 00611 f_task.write("\n</pre>\n") 00612 00613 # Define colour codes 00614 red = "#ff6666" 00615 green = "#66ff66" 00616 blue = "#66aaff" 00617 purple = "#ff44ff" 00618 grey_light = "#999999" 00619 grey_dark = "#777777" 00620 00621 # Construct the colour table with links to start points in table sorted by job phase. 00622 f_task.write("<h2>Task Status Table</h2>\n") 00623 f_task.write("<p><table border=1>\n") 00624 00625 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_light + ">Not ready HELD (") 00626 num_nready_held = self.__jstatNReadyAll - self.__jstatNReadyOther 00627 if num_nready_held: 00628 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_HOLD,num_nready_held)) 00629 else: f_task.write('0)\n') 00630 00631 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_dark + ">Not ready Not HELD (") 00632 if self.__jstatNReadyOther: 00633 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_NHOLD,self.__jstatNReadyOther)) 00634 else: f_task.write('0)\n') 00635 00636 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + blue + \ 00637 ">Ready (" + str(self.__jstatReadyAll) + ")\n") 00638 00639 f_task.write("<td align=center bgcolor=" + green + ">First attempt (") 00640 num_ready_nretry = self.__jstatReadyAll - self.__jstatReadyRetry 00641 if num_ready_nretry: 00642 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_NRETRY,num_ready_nretry)) 00643 else: f_task.write('0)\n') 00644 00645 f_task.write("<tr><td align=center bgcolor=" + red + ">Retry (") 00646 if self.__jstatReadyRetry: 00647 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_RETRY,self.__jstatReadyRetry)) 00648 else: f_task.write('0)\n') 00649 00650 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + purple + \ 00651 ">Submitted (" + str(self.__jstatSubmitAll) + ")\n") 00652 00653 f_task.write("<td align=center bgcolor=" + green + ">Running (") 00654 num_submit_run = self.__jstatSubmitAll - self.__jstatSubmitNRun 00655 if num_submit_run: 00656 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_RUN,num_submit_run)) 00657 else: f_task.write('0)\n') 00658 00659 f_task.write("<tr><td align=center bgcolor=" + red + ">Not running (\n") 00660 if self.__jstatSubmitNRun: 00661 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_NRUN,self.__jstatSubmitNRun)) 00662 else: f_task.write('0)\n') 00663 00664 f_task.write("<tr><td colspan=2 align=center bgcolor=" + green + ">Succeeded (") 00665 num_done_nfail = self.__jstatDoneAll - self.__jstatDoneFail 00666 if num_done_nfail: 00667 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_NFAIL,num_done_nfail)) 00668 else: f_task.write('0)\n') 00669 00670 f_task.write("<tr><td colspan=2 align=center bgcolor=" + red + ">Failed (") 00671 if self.__jstatDoneFail: 00672 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_FAIL,self.__jstatDoneFail)) 00673 else: f_task.write('0)\n') 00674 00675 f_task.write("</table>\n") 00676 00677 # Prepare a hash of lists: job phase -> list of table rows 00678 phase_rows = {} 00679 phase_rows[GID_JPC_NREADY_HOLD] = [] 00680 phase_rows[GID_JPC_NREADY_NHOLD] = [] 00681 phase_rows[GID_JPC_READY_NRETRY] = [] 00682 phase_rows[GID_JPC_READY_RETRY] = [] 00683 phase_rows[GID_JPC_SUBMIT_RUN] = [] 00684 phase_rows[GID_JPC_SUBMIT_NRUN] = [] 00685 phase_rows[GID_JPC_DONE_NFAIL] = [] 00686 phase_rows[GID_JPC_DONE_FAIL] = [] 00687 00688 ## Loop over all jobs, recording summary in a table in the parent task file with 00689 ## links to a file holding job details and associate Ganga job (if any). 00690 f_task.write('<h2><a name="table_by_name">Job Table ordered by Name</a></h2\n') 00691 f_task.write('<p>See also <a href="#table_by_phase">Job Table ordered by Phase</a><p\n') 00692 table_header = "<p><table border=1>\n<tr><th>Job Name<th>Status<br>Date Time<th>Ganga<br>Job<th>Try<br>Num<th>Status\n" 00693 f_task.write(table_header) 00694 job_names = self._jobManagers.keys() 00695 job_names.sort() 00696 for job_name in job_names: 00697 job = self._jobManagers[job_name] 00698 job_name = job.GetName() 00699 00700 # Decide colours for table entry 00701 pc = job.GetPhaseCode() 00702 colour_1,colour_2 = grey_dark, grey_dark 00703 if pc == GID_JPC_NREADY_HOLD: colour_1,colour_2 = grey_light,grey_light 00704 if pc == GID_JPC_NREADY_NHOLD: colour_1,colour_2 = grey_dark, grey_dark 00705 if pc == GID_JPC_READY_NRETRY: colour_1,colour_2 = blue, green 00706 if pc == GID_JPC_READY_RETRY: colour_1,colour_2 = blue, red 00707 if pc == GID_JPC_SUBMIT_RUN: colour_1,colour_2 = purple, green 00708 if pc == GID_JPC_SUBMIT_NRUN: colour_1,colour_2 = purple, red 00709 if pc == GID_JPC_DONE_NFAIL: colour_1,colour_2 = green, green 00710 if pc == GID_JPC_DONE_FAIL: colour_1,colour_2 = red, red 00711 00712 # Initialise string that will hold a single row 00713 table_row = "" 00714 00715 # Create job file and link table entry to it. 00716 job_file = job_dir + "/" + job_name + '.html' 00717 f_job = open(job_file, 'w') 00718 f_job.write("<html><body>\n") 00719 f_job.write("<h1>Status report for Job " + job_name + " produced on " + GBSTimeStamp() + "</h1>\n") 00720 f_job.write("<p>This job is part of the <a href=\"../" + task_name + ".html\">") 00721 f_job.write(task_name + "</a> task<p>\n<pre>\n") 00722 f_job.write(str(job)) 00723 f_job.write("</pre></body></html>\n") 00724 f_job.close() 00725 table_row += "<tr><td bgcolor=" + colour_1 + "><a href=\"" + task_name + "/" \ 00726 + job_name + ".html\">" + job_name + "</a>" 00727 00728 # Chop off year and second to keep date more compact 00729 table_row += "<td bgcolor=" + colour_1 + ">" + job.GetStatusTime()[6:-4] 00730 table_row += "<td bgcolor=" + colour_1 + ">" 00731 gj_id = "unknown" 00732 gj_stat_file = "" 00733 gj_obj = job.GetGangaJob() 00734 if gj_obj: 00735 gj_id = gj_obj.id 00736 else : 00737 gj_stat_file = job._GetTryOutputDir() + "/gbs_ganga.status" 00738 if os.path.isfile(gj_stat_file): 00739 f = os.popen('grep "id " ' + gj_stat_file) 00740 line = f.readline() 00741 f.close() 00742 mo = re.search(r"id = (\d+)",line) 00743 if mo: gj_id = mo.group(1) 00744 gj_obj = open(gj_stat_file) 00745 if not gj_obj: 00746 table_row += "n/a" 00747 else: 00748 00749 # Create Ganga job file 00750 gj_file = job_dir + "/" + job_name + '_ganga_job.html' 00751 f_gj = open(gj_file, 'w') 00752 f_gj.write("<html><body>\n") 00753 f_gj.write("<p><h1>Ganga job for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n") 00754 f_gj.write("<p>This job is part of the <a href=\"../" + task_name + ".html>") 00755 f_gj.write(task_name + "</a> task<p>\n<pre>\n") 00756 if gj_stat_file: 00757 f_gj.write("Dump of expired Ganga Job\n\n") 00758 for line in gj_obj: f_gj.write(line) 00759 gj_obj.close() 00760 else: 00761 f_gj.write("Dump of active Ganga Job\n\n") 00762 f_gj.write(str(gj_obj)) 00763 f_gj.write("</pre></body></html>\n") 00764 f_gj.close() 00765 table_row += "<a href=\"" + task_name + "/" + job_name + "_ganga_job.html\">" + str(gj_id) + "</a>" 00766 00767 # Create a GLF history file 00768 hist_file = job_dir + "/" + job_name + '_history.html' 00769 f_hist = open(hist_file, 'w') 00770 f_hist.write("<html><body>\n") 00771 f_hist.write("<p><h1>Full try history for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n") 00772 f_hist.write("This file contains all the GBS Log Files (GLF) for "\ 00773 + job_name + " which is is part of the <a href=\"../" + task_name + ".html\">") 00774 f_hist.write(task_name + "</a> task<p>\n<pre>\n") 00775 glf_wildcard = job.GetStoreLocation('child_dir') + "/*/gbs_*job*.log" 00776 for glf_file in glob.glob(glf_wildcard): 00777 mo = re.search(r"_(\d+)\.log$",glf_file) 00778 f_hist.write("</pre><h2>GLF Summary for %s</h2><pre>\n" % mo.group(1)) 00779 f_glf = open(glf_file) 00780 f_hist.write(f_glf.read()) 00781 f_glf.close() 00782 f_hist.write("</pre></body></html>\n") 00783 f_hist.close() 00784 00785 hist_file_rel = task_name + "/" + job_name + "_history.html" 00786 table_row += "<td bgcolor=%s align=center> <a href=\"%s\">%d</a>" % (colour_2,hist_file_rel,job.GetTryNumber()) 00787 table_row += "<td bgcolor=" + colour_2 + ">" + GIDStringForJSC(job.GetStatusCode()).ljust(23) \ 00788 + "[" + job.GetStatusText() + "]\n" 00789 # Record row in table ordered by name and and in hash ready forr table ordered by phase. 00790 f_task.write(table_row) 00791 phase_rows[pc].append(table_row) 00792 00793 f_task.write("</table>\n") 00794 00795 ## Construct table ordered by phase 00796 f_task.write('<h2><a name="table_by_phase">Job Table ordered by Phase</a></h2>\n') 00797 f_task.write('<p>See also <a href="#table_by_name">Job Table ordered by Name</a><p>\n') 00798 f_task.write(table_header) 00799 for pc,rows in phase_rows.iteritems(): f_task.write('<a name="phase_%d"></a>\n' % pc)
python::GBSTask::GBSTask::__backend [private] |
Definition at line 27 of file GBSTask.py.
python::GBSTask::GBSTask::__gangaTreeDir [private] |
Definition at line 36 of file GBSTask.py.
Definition at line 24 of file GBSTask.py.
Definition at line 25 of file GBSTask.py.
Definition at line 26 of file GBSTask.py.
python::GBSTask::GBSTask::__jstatAll [private] |
Definition at line 38 of file GBSTask.py.
python::GBSTask::GBSTask::__jstatDoneAll [private] |
Definition at line 45 of file GBSTask.py.
Definition at line 46 of file GBSTask.py.
Definition at line 39 of file GBSTask.py.
Definition at line 40 of file GBSTask.py.
Definition at line 41 of file GBSTask.py.
Definition at line 42 of file GBSTask.py.
Definition at line 43 of file GBSTask.py.
Definition at line 44 of file GBSTask.py.
python::GBSTask::GBSTask::__maxGangaJobs [private] |
Definition at line 29 of file GBSTask.py.
Definition at line 30 of file GBSTask.py.
python::GBSTask::GBSTask::__mode [private] |
Definition at line 31 of file GBSTask.py.
Definition at line 22 of file GBSTask.py.
Definition at line 23 of file GBSTask.py.
Definition at line 28 of file GBSTask.py.
python::GBSTask::GBSTask::_jobManagers [private] |
Definition at line 35 of file GBSTask.py.