Inheritance diagram for python::GBSJob::GBSJob:
Object to submit, and if necessary resubmit a job until it succeeds or needs user intervention. This class is responsible for submitting and if necessary resubmitting a job until it is successful or user intervention is unavoidible.
Definition at line 17 of file GBSJob.py.
def python::GBSObject::GBSObject::__init__ | ( | self, | ||
name, | ||||
parent, | ||||
model | ||||
) | [inherited] |
Definition at line 10 of file GBSObject.py.
00015 : 00016 self.__name = name # Object name, must be unique within scope of parent 00017 self.__parent = parent # Parent (None for Manager) 00018 self.__model = model # Model name 00019 Log(self,logger.SYNOPSIS,"Creating a " + self.GetType() + " named " + self.__name) 00020 00021 # After creation, either read current state from disk, if state file exists, 00022 # or create state file (and if necessary supporting directory) if it doesn't. 00023 00024 if os.path.isfile(self.GetStoreLocation()): 00025 self.Read() else:
def python::GBSJob::GBSJob::__init__ | ( | self, | ||
name, | ||||
parent, | ||||
model, | ||||
model_args | ||||
) |
Definition at line 24 of file GBSJob.py.
00030 : 00031 self.__tryNumber = 0 # 0 before first try 00032 self.__earlyFails = 0 # Symptomatic of a system failure 00033 self.__lateFailsHandled = 0 # Job signalling retry 00034 self.__lateFailsUnhandled = 0 # Job aborting without signal 00035 self.__scriptLocalArgs = "" # Application script local args 00036 self.__localEnvironment = "" # The environment that are local to this job. 00037 self.__localInputSandbox = "" # The list of input sandbox files that are local to this job. 00038 self.__localOutputSandbox = "" # The list of output sandbox files that are local to this job. 00039 00040 # statusCode: See GBSIdCodes 00041 self.__statusCode = GID_JSC_NEW 00042 # Associated text to qualify statusCode. Has specific meaning when 00043 # __statusCode has the following values which correspond to the values 00044 # passed back from the application script via the GBS Log File:- 00045 # 00046 # GID_JSC_SUCCEEDED The list of output files 00047 # GID_JSC_FAILED Diagnostic message about the failure 00048 # GID_JSC_RETRY The retry args for next attempt. 00049 self.__statusText = "Ready to run" 00050 self.__statusTime = timestamp() 00051 # The current retry args i.e. as determined from previous try (or empty for first try) 00052 self.__retryArgs = "" self.__gangaJobId = -1
def python::GBSJob::GBSJob::__repr__ | ( | self | ) |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 75 of file GBSJob.py.
00075 : self.__gangaJobId = self.__statusCode 00076 00077 GBSObject._DoMemberIO(self,ioh) 00078 00079 def GetType(self): return "GBSJob" 00080 00081 def __repr__(self): return self.AsString() 00082 00083
def python::GBSJob::GBSJob::_DoMemberIO | ( | self, | ||
ioh | ||||
) | [private] |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 53 of file GBSJob.py.
00059 : 00060 self._SetTryNumber(ioh("Try Number","i",self.__tryNumber)) 00061 self.__retryArgs = ioh("Current Retry Args","s",self.__retryArgs) 00062 self.__earlyFails = ioh("Early Fail Count","i",self.__earlyFails) 00063 self.__lateFailsHandled = ioh("Late Handled Fails Count","i",self.__lateFailsHandled) 00064 self.__lateFailsUnhandled = ioh("Late Unhandled Fails Count","i",self.__lateFailsUnhandled) 00065 self.__scriptLocalArgs = ioh("Script Local Args","s",self.__scriptLocalArgs) 00066 self.__localEnvironment = ioh("+Local environment","s",self.__localEnvironment) 00067 self.__localInputSandbox = ioh("+Local Input Sandbox List", "s",self.__localInputSandbox) 00068 self.__localOutputSandbox = ioh("+Local Output Sandbox List", "s",self.__localOutputSandbox) 00069 self.__statusCode = ioh("Status Code","i",self.__statusCode) 00070 self.__statusText = ioh("Status Text","s",self.__statusText) 00071 self.__statusTime = ioh("+Status Time Stamp","s",self.__statusTime) 00072 self.__gangaJobId = ioh("+Ganga Job Id","i",self.__gangaJobId)
def python::GBSJob::GBSJob::_GetGbsLogFileName | ( | self, | ||
try_req = 0 | ||||
) | [private] |
Return the name of the GBS Log File for the current try (or supplied try).
Definition at line 784 of file GBSJob.py.
00786 : try_use = try_req 00787 return self.GetStoreLocation("child_dir") + "/try_" + str(try_use).zfill(3) 00788 00789 00790 def _GetGbsLogFileName(self, try_req = 0):
def python::GBSJob::GBSJob::_GetTryOutputDir | ( | self, | ||
try_req = 0 | ||||
) | [private] |
def python::GBSJob::GBSJob::_IncrementEarlyFailsCount | ( | self | ) | [private] |
def python::GBSJob::GBSJob::_IncrementLateHandledFailsCount | ( | self | ) | [private] |
def python::GBSJob::GBSJob::_IncrementLateUnhandledFailsCount | ( | self | ) | [private] |
def python::GBSJob::GBSJob::_SetRetryArgs | ( | self, | ||
value | ||||
) | [private] |
Definition at line 801 of file GBSJob.py.
00801 :%s.%d" % (self.GetName(),self.__tryNumber) 00802 00803 # Methods used by JobAnalyser to update 00804 def _IncrementEarlyFailsCount(self): self.__earlyFails += 1
def python::GBSJob::GBSJob::_SetStatus | ( | self, | ||
code, | ||||
text | ||||
) | [private] |
Definition at line 805 of file GBSJob.py.
00805 : self.__lateFailsHandled += 1 00806 def _IncrementLateUnhandledFailsCount(self): self.__lateFailsUnhandled += 1 00807 def _SetRetryArgs(self,value): self.__retryArgs = value 00808 00809 # Methods that record state changes. These also update state time stamp and add 00810 # entry to GLF if it exists. 00811 def _SetStatus(self,code,text): 00812 if self.__statusCode == code and self.__statusText == text: return 00813 self.__statusCode = code 00814 self.__statusText = text
def python::GBSJob::GBSJob::_SetStatusCode | ( | self, | ||
code | ||||
) | [private] |
def python::GBSJob::GBSJob::_SetStatusText | ( | self, | ||
text | ||||
) | [private] |
def python::GBSJob::GBSJob::_SetTryNumber | ( | self, | ||
try_no | ||||
) | [private] |
Update try number and associated tryID.
Definition at line 791 of file GBSJob.py.
00794 : try_use = try_req 00795 return "gbs_" + self.GetParent().GetName() + "_" + self.GetName() + "_" + str(try_use) + ".log" 00796 00797 def _SetTryNumber(self, try_no):
def python::GBSJob::GBSJob::Analyse | ( | self, | ||
update = True | ||||
) |
Perform job termination analysis and optionally apply the results.
Definition at line 289 of file GBSJob.py.
00295 : 00296 00297 """Perform job termination analysis and optionally apply the results.""" 00298 00299 # Quit if nothing to analyse. 00300 if self.__statusCode != GID_JSC_WAITING_ANALYSIS: return 00301 00302 analyser = GetModelRegistry().CreateObject(self.GetModel(),"JobAnalyser","Solomon",self) 00303 analyser.Analyse(self) 00304 if update: 00305 analyser.Apply() gj = self.GetGangaJob()
def python::GBSJob::GBSJob::AsString | ( | self, | ||
level = "Full" | ||||
) |
Return string description. Return string description at the following levels:- "Brief" one line summary suitable for rows in tables "Heading" one line summary suitable as heading for "Brief" "Full" full desciption including value of every data member
Definition at line 84 of file GBSJob.py.
00090 : 00091 00092 """Return string description. 00093 00094 Return string description at the following levels:- 00095 "Brief" one line summary suitable for rows in tables 00096 "Heading" one line summary suitable as heading for "Brief" 00097 "Full" full desciption including value of every data member""" 00098 00099 if ( level == "Heading"): 00100 s = "Name".ljust(20) 00101 s += "Status".ljust(23) 00102 s += "Input".ljust(40) 00103 s += "Status Details" 00104 return s 00105 if ( level == "Brief"): 00106 s = self.GetName().ljust(20) 00107 s += GIDStringForJSC(self.__statusCode).ljust(23) 00108 s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40) 00109 s += "[" + self.__statusText + "]" 00110 return s 00111 00112 s = GBSObject.__repr__(self) + "\n\n" 00113 s += "Status: " + GIDStringForJSC(self.__statusCode) + " [" + self.__statusText + "] at " + self.__statusTime + "\n" 00114 s += " Associated Ganga Job ID: " + str(self.__gangaJobId) + "\n\n" 00115 s += "Job Definition\n" 00116 s += " Script local args: '" + self.__scriptLocalArgs + "'\n" 00117 s += " Local environment: '" + self.__localEnvironment + "'\n" 00118 s += " Input Sandbox: '" + self.__localInputSandbox + "'\n" 00119 s += " Output Sandbox: '" + self.__localOutputSandbox + "'\n\n" 00120 s += "Retry Status\n" 00121 s += " Try: " + str(self.__tryNumber) + "\n" 00122 s += " Retry Args: '" + str(self.__retryArgs) + "'\n" 00123 s += " Early Fails: " + str(self.__earlyFails) + "\n" 00124 s += " Late Handled Fails: " + str(self.__lateFailsHandled) + "\n" 00125 s += " Late Unhandled Fails: " + str(self.__lateFailsUnhandled) + "\n" 00126 00127 # If there exist any output, give a summary, either for the current try, if 00128 # complete, or the previous one if currently submitted. 00129 00130 display_try = self.__tryNumber 00131 if self.__statusCode >= GID_JSC_SUBMITTED: display_try -= 1 00132 if display_try < 1: return s 00133 output_dir = self._GetTryOutputDir(display_try) 00134 s += "\nThe output for try %d can be found in\n\n %s\n\n and consists of:-\n\n" % (display_try,output_dir) 00135 list_dir = output_dir + '/../listing.tmp' 00136 os.system("cd " + output_dir + ";ls -l > " + list_dir) 00137 f = open(list_dir) 00138 for line in f: s += " " + line 00139 f.close() 00140 os.remove(list_dir) 00141 gbs_log_file_name = self._GetGbsLogFileName(display_try) 00142 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name) 00143 if not os.path.isfile(gbs_log_file_spec): return s s += "\nThe GLF (GBS Log File) %s contains:-\n\n" % gbs_log_file_name
def python::GBSJob::GBSJob::CanClear | ( | self | ) |
def python::GBSJob::GBSJob::CanKill | ( | self | ) |
def python::GBSJob::GBSJob::CanSubmit | ( | self | ) |
def python::GBSJob::GBSJob::ClearErrorCounts | ( | self, | ||
warn = True | ||||
) |
If allowed, clear error counts, but leave retry history intact Can only applied to jobs that are ready to be submitted or that have failed and in this case has the side effect of setting the status back to RETRY
Definition at line 306 of file GBSJob.py.
00307 : gj.remove() 00308 self.__gangaJobId = -1 00309 self.GetParent().RefreshJobStats() 00310 self.Write() 00311 00312 def ClearErrorCounts(self,warn=True): 00313 00314 """If allowed, clear error counts, but leave retry history intact 00315 00316 Can only applied to jobs that are ready to be submitted or that have failed and 00317 in this case has the side effect of setting the status back to RETRY""" 00318 00319 if self.CanClear(): 00320 self.__earlyFails = 0 00321 self.__lateFailsHandled = 0 00322 self.__lateFailsUnhandled = 0 00323 if self.__statusCode == GID_JSC_FAILED:
def python::GBSJob::GBSJob::ClearHistory | ( | self, | ||
confirm = True , |
||||
warn = True | ||||
) |
Completely clear all processing history so that processing begins again from scratch. The only processing state that is retained is that if job was held it will still be.
Definition at line 324 of file GBSJob.py.
00328 : print "Cannot ClearErrorCounts on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00329 00330 def ClearHistory(self,confirm=True,warn=True): 00331 00332 """Completely clear all processing history so that processing begins again from scratch. 00333 00334 The only processing state that is retained is that if job was held it will still be.""" 00335 00336 if not self.CanClear(): 00337 if warn: print "Cannot ClearHistory on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00338 return 00339 if confirm: 00340 ans = raw_input("Are you sure you want to clear the history of job " + self.GetName() + " ?y[n]") 00341 if not re.search(r"^y",ans,re.I): 00342 print "History not cleared." 00343 return 00344 child_dir = self.GetStoreLocation("child_dir") 00345 if os.path.isdir(child_dir): 00346 if shutil.rmtree(child_dir): 00347 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir) 00348 self._SetTryNumber(0) 00349 self.__retryArgs = ""
def python::GBSJob::GBSJob::GetEarlyFailsCount | ( | self | ) |
def python::GBSJob::GBSJob::GetGangaJob | ( | self | ) |
Return associated Ganga job (if any)
Definition at line 169 of file GBSJob.py.
00171 : 00172 """Return associated Ganga job Id (if any) or -1 """ 00173 return self.__gangaJobId 00174 00175 def GetGangaJob(self): 00176 """Return associated Ganga job (if any)""" 00177 if self.__gangaJobId < 0: return None 00178 try: gj = Ganga.GPI.jobs(self.__gangaJobId)
def python::GBSJob::GBSJob::GetGangaJobId | ( | self | ) |
def python::GBSJob::GBSJob::GetLateHandledFailsCount | ( | self | ) |
def python::GBSJob::GBSJob::GetLateUnhandledFailsCount | ( | self | ) |
def python::GBSJob::GBSJob::GetLocalEnvironment | ( | self, | ||
prettyPrint = False | ||||
) |
Return, as a comma separated list string, the environment that is local to this job. If prettyPrint is True print out list one item per line
Definition at line 187 of file GBSJob.py.
00189 : 00190 """Return Late Unhandled Fails Count""" 00191 return self.__lateFailsUnhandled 00192 00193 def GetLocalEnvironment(self,prettyPrint = False): 00194 """Return, as a comma separated list string, the environment that is local to this job.
def python::GBSJob::GBSJob::GetLocalInputSandbox | ( | self | ) |
def python::GBSJob::GBSJob::GetLocalOutputSandbox | ( | self | ) |
def python::GBSObject::GBSObject::GetModel | ( | self | ) | [inherited] |
def python::GBSObject::GBSObject::GetName | ( | self | ) | [inherited] |
def python::GBSObject::GBSObject::GetParent | ( | self | ) | [inherited] |
def python::GBSJob::GBSJob::GetPhaseCode | ( | self | ) |
Return phase code. These are broad categories of status code used by task for job statistics.
Definition at line 203 of file GBSJob.py.
00205 : 00206 """Return, as a comma separated list string, the ouput sandbox file list that is local to this job.""" 00207 return self.__localOutputSandbox 00208 00209 def GetPhaseCode(self): 00210 """Return phase code. These are broad categories of status code used by task for job statistics.""" 00211 pc = 0 00212 if self.IsComplete(): 00213 pc = GID_JPC_DONE_NFAIL 00214 if not self.IsSuccessful(): pc = GID_JPC_DONE_FAIL 00215 elif self.IsReady(): 00216 pc = GID_JPC_READY_NRETRY 00217 if self.GetTryNumber() > 0: pc = GID_JPC_READY_RETRY 00218 elif self.IsSubmitted(): 00219 pc = GID_JPC_SUBMIT_RUN 00220 if not self.IsRunning(): pc = GID_JPC_SUBMIT_NRUN else:
def python::GBSJob::GBSJob::GetRetryArgs | ( | self | ) |
def python::GBSJob::GBSJob::GetScriptLocalArgs | ( | self | ) |
def python::GBSJob::GBSJob::GetStatusCode | ( | self | ) |
def python::GBSJob::GBSJob::GetStatusText | ( | self | ) |
def python::GBSJob::GBSJob::GetStatusTime | ( | self | ) |
def python::GBSObject::GBSObject::GetStoreLocation | ( | self, | ||
type = "self" | ||||
) | [inherited] |
Return storage location: parent directory, self or child directory. type is one of "parent_dir" parent's child directory "self" object's state file "child_dir" directory for own children
Definition at line 38 of file GBSObject.py.
00039 : return self.__name 00040 def GetModel(self): return self.__model 00041 def GetParent(self): return self.__parent 00042 00043 def GetStoreLocation(self,type="self"): 00044 00045 """Return storage location: parent directory, self or child directory. 00046 00047 type is one of "parent_dir" parent's child directory 00048 "self" object's state file 00049 "child_dir" directory for own children""" 00050 00051 loc = "" 00052 if self.__parent == None : loc = GBSConfig.GetConfig().GetValue("DataDirectory") 00053 else : loc = self.__parent.GetStoreLocation("parent_dir") + "/" + self.__parent.GetName() 00054 if type != "parent_dir": loc += "/" + self.GetName()
def python::GBSJob::GBSJob::GetTryID | ( | self | ) |
def python::GBSJob::GBSJob::GetTryNumber | ( | self | ) |
def python::GBSJob::GBSJob::GetType | ( | self | ) |
Reimplemented from python::GBSObject::GBSObject.
Definition at line 73 of file GBSJob.py.
00075 : self.__gangaJobId = self.__statusCode
def python::GBSJob::GBSJob::Hold | ( | self, | ||
warn = True | ||||
) |
Hold job, so that it won't be submitted. Warn, if requested, if job not suitable for holding
Definition at line 350 of file GBSJob.py.
00351 : 00352 self._SetStatus(GID_JSC_NEW,"Ready to run") 00353 self.ClearErrorCounts() 00354 return 00355 00356 def Hold(self,warn=True): 00357 00358 """Hold job, so that it won't be submitted. Warn, if requested, if job not suitable for holding""" 00359 00360 if self.IsReady(): self._SetStatusCode(GID_JSC_HELD,"Held by user")
def python::GBSJob::GBSJob::IsComplete | ( | self | ) |
def python::GBSJob::GBSJob::IsFailure | ( | self | ) |
def python::GBSJob::GBSJob::IsHeld | ( | self | ) |
def python::GBSJob::GBSJob::IsNotReady | ( | self | ) |
def python::GBSJob::GBSJob::IsReady | ( | self | ) |
def python::GBSJob::GBSJob::IsRunning | ( | self | ) |
def python::GBSJob::GBSJob::IsSubmitted | ( | self | ) |
def python::GBSJob::GBSJob::IsSuccessful | ( | self | ) |
Return true if job is successful
Definition at line 278 of file GBSJob.py.
00280 : 00281 """Return true if job is submitted""" 00282 return self.__statusCode >= GID_JSC_SUBMITTED 00283 00284 def IsSuccessful(self): 00285 """Return true if job is successful""" 00286 return self.__statusCode == GID_JSC_SUCCEEDED 00287 00288 #----------------------------------------------------------------------------------------------- #
def python::GBSJob::GBSJob::Kill | ( | self, | ||
warn = True | ||||
) |
Kill job that has been submitted to Ganga. Warn, if requested, if job not suitable for killing
Definition at line 361 of file GBSJob.py.
00365 : print "Cannot HOLD job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00366 00367 def Kill(self,warn=True): 00368 00369 """Kill job that has been submitted to Ganga. Warn, if requested, if job not suitable for killing""" 00370 00371 if not self.CanKill(): 00372 if warn: print "Cannot KILL job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00373 return 00374 try: 00375 gj = Ganga.GPI.jobs(self.__statusCode) 00376 # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just 00377 # catch everything and hope it is O.K.! 00378 except: 00379 Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode))
def python::GBSObject::GBSObject::MakeChildDirectory | ( | self | ) | [inherited] |
If directory used to store child objects does not exist, create it
Definition at line 55 of file GBSObject.py.
00055 : loc += ".state" 00056 return loc 00057 00058 # I/O 00059 00060 def MakeChildDirectory(self): 00061 00062 """If directory used to store child objects does not exist, create it""" 00063 00064 child_dir = self.GetStoreLocation("child_dir")
def python::GBSObject::GBSObject::Read | ( | self | ) | [inherited] |
Definition at line 65 of file GBSObject.py.
00065 : Log(self,logger.SYNOPSIS,"Creating directory for child objects:" + str(child_dir))
def python::GBSJob::GBSJob::Release | ( | self, | ||
warn = True | ||||
) |
Release job, so that it can be submitted. Warn, if requested, if job not suitable for releasing
Definition at line 380 of file GBSJob.py.
00386 : 00387 00388 """Release job, so that it can be submitted. Warn, if requested, if job not suitable for releasing""" 00389 00390 if self.IsHeld(): 00391 self.__statusCode = GID_JSC_NEW 00392 if self.__tryNumber: self.__statusCode = GID_JSC_RETRY self._SetStatusText("Ready to run")
def python::GBSJob::GBSJob::Remove | ( | self, | ||
confirm = True , |
||||
warn = True | ||||
) |
Completely remove job.
Definition at line 393 of file GBSJob.py.
00397 : print "Cannot RELEASE job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00398 00399 def Remove(self,confirm=True,warn=True): 00400 00401 """Completely remove job.""" 00402 00403 if not self.CanClear(): 00404 if warn: print "Cannot Remove job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00405 return 00406 if confirm: 00407 ans = raw_input("Are you sure you want to remove job " + self.GetName() + " ?y[n]") 00408 if not re.search(r"^y",ans,re.I): 00409 print "Not removed." 00410 return 00411 child_dir = self.GetStoreLocation("child_dir") 00412 if os.path.isdir(child_dir): 00413 if shutil.rmtree(child_dir): 00414 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir) 00415 state_file = self.GetStoreLocation("self") 00416 if os.remove(state_file): 00417 Log(self,logger.ERROR,"Failed to remove state file:" + state_file)
def python::GBSObject::GBSObject::Rename | ( | self, | ||
new_name | ||||
) | [inherited] |
Rename to new_name (does not perform I/O - that must be done by caller).
Definition at line 82 of file GBSObject.py.
00082 : 00083 self.__model = ioh("Model Name","s",self.__model) 00084 self.__name = ioh("Object Name","s",self.__name) 00085 00086 00087 def Rename(self,new_name): 00088 """Rename to new_name (does not perform I/O - that must be done by caller).""" self.__name = new_name
def python::GBSJob::GBSJob::SetLocalEnvironment | ( | self, | ||
env_str | ||||
) |
Set, as a comma separated list string, the environment that local to this job.. e.g. job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456' If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it)
Definition at line 418 of file GBSJob.py.
00424 : 00425 00426 """Set, as a comma separated list string, the environment that local to this job.. 00427 00428 e.g. job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456' 00429 00430 If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it)""" 00431 00432 if re.search(r"^\+",env_str): 00433 s = self.__localEnvironment 00434 if s: s += ',' 00435 s += env_str[1:] 00436 # Force local method to be recalled rather than any inherited one 00437 # MinosRSMJob internally uses append and if that got recalled get a recursive loop. 00438 GBSJob.SetLocalEnvironment(self,s) 00439 return d = GBSUtilities.ParseEnvStr(env_str)
def python::GBSJob::GBSJob::SetLocalInputSandbox | ( | self, | ||
in_sbox_str | ||||
) |
Set, as a comma separated list string, the input sandbox file list that is local to this job. e.g. task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')
Definition at line 440 of file GBSJob.py.
00441 : print "Cannot parse environment string: '" + env_str + "'" 00442 else: 00443 self.__localEnvironment = GBSUtilities.BuildEnvStr(d) 00444 self.Write() 00445 00446 def SetLocalInputSandbox(self,in_sbox_str): 00447 00448 """Set, as a comma separated list string, the input sandbox file list that is local to this job. 00449 00450 e.g. task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
def python::GBSJob::GBSJob::SetLocalOutputSandbox | ( | self, | ||
out_sbox_str | ||||
) |
Set, as a comma separated list string, the output sandbox file list that is local to this job. e.g. task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')
Definition at line 451 of file GBSJob.py.
00453 : 00454 self.__localInputSandbox = str 00455 self.Write() 00456 00457 def SetLocalOutputSandbox(self,out_sbox_str): 00458 00459 """Set, as a comma separated list string, the output sandbox file list that is local to this job. 00460 00461 e.g. task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')"""
def python::GBSJob::GBSJob::SetScriptLocalArgs | ( | self, | ||
arg_str | ||||
) |
Set (as a string) the comma separated list of application script args that are local to this job. e.g. job.SetScriptLocalArgs('123,a string with spaces,456')
Definition at line 462 of file GBSJob.py.
00464 : 00465 self.__localOutputSandbox = str 00466 self.Write() 00467 00468 def SetScriptLocalArgs(self,arg_str): 00469 00470 """Set (as a string) the comma separated list of application script args that are local to this job.
def python::GBSJob::GBSJob::Submit | ( | self, | ||
Perusable = False , |
||||
MonitorFrequency = 0 , |
||||
MonitorCommand = "ps -o pid , |
||||
ppid, | ||||
rss, | ||||
vsize, | ||||
pcpu, | ||||
pmem, | ||||
cmd, | ||||
u, | ||||
USER | ||||
) |
Submit job if permitted and return True if successful.
Definition at line 471 of file GBSJob.py.
00479 : 00480 00481 """Submit job if permitted and return True if successful.""" 00482 00483 my_manager = self.GetParent() 00484 00485 # Get the user application script 00486 script_spec = my_manager.GetScriptFileSpec() 00487 if not script_spec: 00488 print "Cannot submit job, no user application script assigned to Task '" + my_manager.GetName() + "'" 00489 return False 00490 script_name = my_manager.GetScriptFileName() 00491 00492 if not self.CanSubmit(): 00493 print "Cannot submit job " + self.GetName() + ":" \ 00494 " Status: " + GIDStringForJSC(self.__statusCode) \ 00495 + " [" + self.GetStatusText() + "]" 00496 return False 00497 00498 if not my_manager.IsAuthorisedToSubmit(): return False 00499 00500 # Increment try number and prepare output directory to receive results 00501 00502 self._SetTryNumber(self.__tryNumber + 1) 00503 self.MakeChildDirectory() 00504 output_dir = self._GetTryOutputDir() 00505 # It ought not to exist already, but in case it is, remove it first. 00506 if os.path.isdir(output_dir): 00507 Log(self,logger.SYNOPSIS,"Removing obsolete directory for job output:" + str(output_dir)) 00508 if shutil.rmtree(output_dir): 00509 Log(self,logger.ERROR,"Failed to remove obsolete directory for job output:" + str(output_dir)) 00510 Log(self,logger.SYNOPSIS,"Creating directory for job output:" + str(output_dir)) 00511 if os.mkdir(output_dir,0755): 00512 Log(self,logger.ERROR,"Failed to create directory for job output:" + str(output_dir)) 00513 00514 # Prepare GBS Log File in output directory 00515 gbs_log_file_name = self._GetGbsLogFileName() 00516 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name) 00517 os.system("echo " + timestamp() + " INFO GBS_JOB_SUBMIT submitting job > " + gbs_log_file_spec) 00518 00519 # Prepare arg list: GLF file, appl. script, global args, local args. 00520 args = [] 00521 args.append(gbs_log_file_name) 00522 args.append(script_name) 00523 00524 # Collect up the application script args, taking account of ecaped commas. 00525 script_args = [] 00526 for arglist in [my_manager.GetScriptGlobalArgs(),self.GetScriptLocalArgs()]: 00527 if arglist: GBSUtilities.ParseCommaSepList(arglist,script_args) 00528 00529 # Passing args with anything but alphanumeric data into the 00530 # application script is a pain. The logical approach, placing 00531 # each arg as a separate element in 'args' works for the local 00532 # back-end but is broken for LCG where all the args are first 00533 # interpolated and then re-parsed to define the argument list 00534 # that gets passed into the script. Any string containing 00535 # white space gets broken apart and anything containing "(" causes 00536 # a syntax error! So, instead we manufacture the wrapper 00537 # script on the fly that will call the application script 00538 # which gives us total control over the way arguments get passed in. 00539 00540 wrapper_file_spec = str(output_dir) + "/" + "gbs_job_wrapper.sh" 00541 00542 #Copy the leading part of the wrapper 00543 os.system("cp $GBS_HOME/python/gbs_job_wrapper_part_1.sh " + wrapper_file_spec) 00544 00545 wrapper = file(wrapper_file_spec,'a') 00546 00547 #If monitoring is required, set up and start running the monitor 00548 if MonitorFrequency: 00549 wrapper.write('monitor_script=$GBS_WORK_DIR/gbs_monitor.sh\n') 00550 wrapper.write('if [ -f $monitor_script ] ; then rm -f $monitor_script; fi\n') 00551 wrapper.write('echo "call_num=0" >> $monitor_script\n') 00552 wrapper.write('echo "while [ 1 = 1 ]" >> $monitor_script\n') 00553 wrapper.write('echo "do" >> $monitor_script\n') 00554 wrapper.write('echo " echo \`date \'+%F %H:%M:%S \'\` Monitoring call number \$(( ++call_num ))" >> $monitor_script\n') 00555 wrapper.write('echo " %s" >> $monitor_script\n' %\ 00556 MonitorCommand) 00557 wrapper.write('echo " sleep %d" >> $monitor_script\n' % \ 00558 MonitorFrequency) 00559 wrapper.write('echo "done" >> $monitor_script\n') 00560 wrapper.write('chmod +x $monitor_script\n') 00561 wrapper.write('$GBS_LOG INFO Starting monitoring script at frequency %d secs to execute %s\n' %\ 00562 (MonitorFrequency,MonitorCommand)) 00563 wrapper.write('$monitor_script &\n') 00564 00565 # Manufacture the bit where the arguments get passed in 00566 num_arg = 0 00567 arglist = "" 00568 for arg in script_args: 00569 num_arg += 1 00570 wrapper.write("arg" + str(num_arg) + "=\"" + arg + "\"\n") 00571 arglist += "\"$arg" + str(num_arg) +"\" " 00572 wrapper.write("/usr/bin/time --format 'CPU User: %U sec, System: %S sec. Elapse: %E. ' --output=$GBS_WORK_DIR/time_output_$$ "\ 00573 + "./$user_script " + arglist + "\n") 00574 wrapper.close() 00575 00576 #Copy the trailing part of the wrapper 00577 os.system("cat $GBS_HOME/python/gbs_job_wrapper_part_2.sh >>" + wrapper_file_spec) 00578 00579 # Prepare the GBS environment 00580 env = {} 00581 env['GBS_MODE'] = my_manager.GetMode() 00582 env['GBS_RETRY_COUNT'] = str(self.__tryNumber -1) 00583 retry_arg_no = 0 00584 for retry_arg in self.__retryArgs.split(): 00585 retry_arg_no += 1 00586 env['GBS_RETRY_ARG_' + str(retry_arg_no)] = retry_arg 00587 env['GBS_NUM_RETRY_ARGS'] = str(retry_arg_no) 00588 00589 # Prepare the user environment 00590 for env_str in [my_manager.GetGlobalEnvironment(),self.GetLocalEnvironment()]: 00591 if not env_str: continue 00592 for var,val in GBSUtilities.ParseEnvStr(env_str).iteritems(): 00593 env[var] = val 00594 00595 # Prepare the executable 00596 exe = Ganga.GPI.Executable(exe = Ganga.GPI.File(wrapper_file_spec), env = env,args = args) 00597 00598 # Prepare sandboxes 00599 inputsandbox = [] 00600 inputsandbox.append(script_spec) 00601 inputsandbox.append(gbs_log_file_spec) 00602 env_cmds = "" 00603 for e_name,e_value in env.iteritems(): 00604 env_cmds += e_name + "=" + str(e_value) + ";" 00605 for obj in [my_manager,self]: 00606 sb_file_list = [] 00607 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalInputSandbox(),sb_file_list) 00608 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalInputSandbox(),sb_file_list) 00609 input_dir = obj.GetStoreLocation("child_dir") + "/InputSandbox" 00610 for sb_file in sb_file_list: inputsandbox.append(input_dir + "/" + sb_file) 00611 outputsandbox = ['gbs_output_sandbox.tar.gz'] 00612 outputsandbox.append(gbs_log_file_name) 00613 for obj in [my_manager,self]: 00614 sb_file_list = [] 00615 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalOutputSandbox(),sb_file_list) 00616 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalOutputSandbox(),sb_file_list) 00617 for sb_file in sb_file_list: 00618 if sb_file[0] == "$": 00619 inp = os.popen(env_cmds + sb_file[1:]) 00620 sb_file = inp.read() 00621 inp.close() 00622 outputsandbox.append(sb_file) 00623 00624 # Prepare backend 00625 backend = my_manager.GetBackend() 00626 queue = "" 00627 mo = re.search(r"(.*?):(.*)",backend) 00628 if mo: 00629 (backend,queue) = mo.groups() 00630 if backend == "Local": queue = "" 00631 00632 # Prepare the job 00633 gj = Ganga.GPI.Job(backend=backend) 00634 Ganga.GPI.jobtree.add(gj,my_manager.GetGangaTreeDir()) 00635 if queue : 00636 if backend == "PBS": gj.backend.queue = queue 00637 if backend == "LCG": gj.backend.CE = queue 00638 # If GLITE_ENABLE is enabled select the correct middleware 00639 if backend == "LCG" and Ganga.GPI.config['LCG']['GLITE_ENABLE']: 00640 gj.backend.middleware = 'GLITE' 00641 if Perusable: gj.backend.perusable = True 00642 elif Perusable: print "Cannot select perusable; backend not LCG/GLITE" 00643 gj.application = exe 00644 gj.inputsandbox = inputsandbox 00645 gj.outputsandbox = outputsandbox 00646 if GetLoggerThreshold() <= logger.SYNOPSIS: print "Contents of Ganga job:-\n\n" + str(gj) 00647 try: 00648 print "Submitting ",self.GetName() 00649 timeout_call(self,gj.submit,3*60) 00650 Log(self,logger.INFO,self.__tryID + " submitted") 00651 except Exception,inst: 00652 excp_msg = "Caught exception:" + str(inst) +" Failed to complete Ganga job submit" 00653 Log(self,logger.ERROR,excp_msg) 00654 self._SetStatusText(excp_msg) 00655 self._SetTryNumber(self.__tryNumber - 1) 00656 return False 00657 self.__gangaJobId = gj.id self._SetStatus(gj.id,"Ganga status:" + gj.status)
def python::GBSJob::GBSJob::UpdateStatus | ( | self | ) |
Get latest status. May involve checking Ganga job and retrieving output.
Definition at line 658 of file GBSJob.py.
00658 :" + gj.status) 00659 self.GetParent().RefreshJobStats() 00660 self.Write() 00661 return True 00662 00663 00664 def UpdateStatus(self): 00665 00666 """Get latest status. May involve checking Ganga job and retrieving output.""" 00667 00668 if not self.IsSubmitted(): return 00669 try: 00670 gj = Ganga.GPI.jobs(self.__statusCode) 00671 # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just 00672 # catch everything and hope it is O.K.! 00673 except: 00674 gj = None 00675 00676 # Deal with lost Ganga job, ought not to happen, but then lots of things in life ought not to happen. 00677 if not gj: 00678 Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode)) 00679 # Treat as if status has reverted to "new" 00680 g_status = 'new' 00681 else: 00682 g_status = gj.status 00683 00684 # Deal with cases when job hasn't finished 00685 if g_status == "completing" \ 00686 or g_status == "ready" \ 00687 or g_status == "running" \ 00688 or g_status == "scheduled" \ 00689 or g_status == "submitted" \ 00690 or g_status == "waiting": 00691 self._SetStatusText("Ganga status:" + g_status) 00692 00693 # If Ganga job does not appear stalled, just record any change in status. 00694 t_stall_hours = (time.time() \ 00695 - time.mktime(time.strptime(self.__statusTime.strip(),"%Y-%m-%d %H:%M:%S")))/3600. 00696 # Set a 3 hour time limit except for scheduled and running where 3 days is allowed. 00697 t_timeout_hours = 3. 00698 if g_status == "scheduled" or g_status == "running": t_timeout_hours = 3.*24. 00699 if t_stall_hours < t_timeout_hours: 00700 self.GetParent().RefreshJobStats() 00701 self.Write() 00702 return 00703 # Ganga job has stalled; kill it and treat as new. 00704 self._SetStatusText("killed Ganga job %d stalled for %5.1f hours " % (gj.id,t_stall_hours)) 00705 Log(self,logger.ERROR,self.__tryID + " " + self.__statusText) 00706 gj.remove() 00707 g_status = 'new' 00708 00709 # If status has reverted to "new" submit never took place 00710 if g_status == 'new': 00711 self._SetTryNumber(self.__tryNumber - 1) 00712 self.__statusCode = GID_JSC_NEW 00713 self.__gangaJobId = -1 00714 if self.__tryNumber > 1: self.__statusCode = GID_JSC_RETRY 00715 self._SetStatusText("Ready to run") 00716 self.GetParent().RefreshJobStats() 00717 self.Write() 00718 return 00719 00720 00721 # For all other cases move over output data, unpack gbs_output_sandbox.tar.gz (if present) and save Ganga status 00722 output_dir = self._GetTryOutputDir() 00723 Log(self,logger.SYNOPSIS,"Retrieving output from " + str(gj.outputdir) + " into " + str(output_dir)) 00724 if not os.listdir(gj.outputdir): 00725 Log(self,logger.ERROR,self.__tryID + " No output files in " + str(gj.outputdir)) 00726 elif os.system("cp " + str(gj.outputdir) + "* " + str(output_dir)): 00727 Log(self,logger.ERROR,self.__tryID + " Failed to retrieve output from " + str(gj.outputdir) + " into " + str(output_dir)) 00728 try: 00729 if gj.backend.perusable: 00730 print "Job is perusable, attempting to recover stdout, as stdout.perusable" 00731 cmd = "glite-wms-job-perusal --get -f stdout --all --noint " \ 00732 + gj.backend.id + " > " + output_dir + "/stdout.perusable" 00733 if os.system(cmd): 00734 Log(self,logger.ERROR,self.__tryID + " Failed to retrieve persuable output using ",cmd) 00735 except: pass 00736 gbs_output_sandbox = output_dir + "/gbs_output_sandbox.tar.gz" 00737 if os.path.isfile(gbs_output_sandbox): 00738 Log(self,logger.SYNOPSIS,"Unpacking " + gbs_output_sandbox) 00739 if os.system("cd " + output_dir + "; tar zxf gbs_output_sandbox.tar.gz"): 00740 Log(self,logger.ERROR,self.__tryID + " Failed to unpack " + gbs_output_sandbox) 00741 else: 00742 os.remove(gbs_output_sandbox) 00743 gangaStatusFile = output_dir + "/gbs_ganga.status" 00744 f = open(gangaStatusFile,"w") 00745 f.write(str(gj)) 00746 f.close() 00747 self.__statusCode = GID_JSC_WAITING_ANALYSIS 00748 try: g_status += " [" + gj.backend.reason + "]" 00749 except: pass
def python::GBSJob::GBSJob::WaitForJob | ( | self, | ||
num_tries = 100 , |
||||
time_interval = 30 | ||||
) |
Wait for running job to end before returning. Limit to 'num_tries' with a sleep of 'time_interval' between.
Definition at line 750 of file GBSJob.py.
00756 : 00757 00758 """Wait for running job to end before returning. 00759 00760 Limit to 'num_tries' with a sleep of 'time_interval' between.""" 00761 if not self.IsSubmitted(): 00762 print "Cannot wait for job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00763 return 00764 try_num = 0 00765 current_state = "" 00766 print "Waiting for job %s to end" % self.GetName() 00767 while ( try_num < num_tries ): 00768 try_num += 1 00769 self.UpdateStatus() 00770 new_state = "%s [%s]" % (GIDStringForJSC(self.GetStatusCode()),self.GetStatusText()) 00771 if current_state != new_state: 00772 print timestamp() + " Job %s status now %s" % (self.GetName(),new_state) current_state = new_state
def python::GBSObject::GBSObject::Write | ( | self | ) | [inherited] |
def python::GBSObject::GBSObject::WriteFamily | ( | self | ) | [inherited] |
Reimplemented in python::GBSManager::GBSManager, and python::GBSTask::GBSTask.
Definition at line 67 of file GBSObject.py.
00067 : 00068 Log(self,logger.ERROR,"Failed to create directory for child objects:" + str(child_dir))
python::GBSJob::GBSJob::__earlyFails [private] |
python::GBSJob::GBSJob::__gangaJobId [private] |
python::GBSJob::GBSJob::__retryArgs [private] |
python::GBSJob::GBSJob::__statusCode [private] |
python::GBSJob::GBSJob::__statusText [private] |
python::GBSJob::GBSJob::__statusTime [private] |
python::GBSJob::GBSJob::__tryID [private] |
python::GBSJob::GBSJob::__tryNumber [private] |