Public Member Functions | |
def | __init__ |
def | Clear |
def | GetType |
def | GetRetryArgs |
def | GetJob |
def | __repr__ |
def | Analyse |
def | Apply |
Private Member Functions | |
def | __CollectData |
def | __MakeJudgement |
def | __WriteLoggingInfoFile |
Job termination analysis object. Based on output returned and past history decides what to do next. Unlike other role-playing objects this does not inherit from GBSObject as it has no persistable state; it is called into existance by a Job to analyse the results of a Ganga job and then discarded. It has no user callable methods
Definition at line 11 of file GBSJobAnalyser.py.
def python::GBSJobAnalyser::GBSJobAnalyser::__CollectData | ( | self | ) | [private] |
Colect information about the client job termination.
Definition at line 104 of file GBSJobAnalyser.py.
00106 :-\n") 00107 f.write(str(self) + "\n") 00108 f.close() 00109 00110 ###### Private Methods (not user callable) ###### 00111 00112 00113 def __CollectData(self): 00114 00115 """Colect information about the client job termination.""" 00116 00117 self.__judgeFailCategory = GID_FCC_NONE 00118 self.__judgeStatusCode = GID_JSC_WAITING_ANALYSIS 00119 self.__judgeStatusText = "" 00120 00121 jobTryDir = self.__clientJob._GetTryOutputDir() 00122 00123 # GANGA Level: Check for valid Ganga exit code 00124 00125 self.__comLevelCode = GID_CLC_GANGA 00126 self.__gangaStatus = "Cannot obtain Ganga exit code" 00127 gangaStatusFile = jobTryDir + "/gbs_ganga.status" 00128 if not os.path.isfile(gangaStatusFile): 00129 self.__comLevelText = "Failed to find Ganga status file" + str(gangaStatusFile) 00130 Log(self.__clientJob,logger.ERROR,self.__comLevelText) 00131 return 00132 Log(self.__clientJob,logger.DEBUG,"Reading Ganga status file" + str(gangaStatusFile)) 00133 self.__gangaStatus = "" 00134 grid_middleware = "" 00135 grid_id = "" 00136 f = open(gangaStatusFile) 00137 for line in f: 00138 # Collect the first status = ... line 00139 mo = re.search(r"status = '(.*)'",line) 00140 if mo and not self.__gangaStatus: self.__gangaStatus = mo.group(1) 00141 mo = re.search(r"middleware = '(.*)'",line) 00142 if mo: grid_middleware = mo.group(1) 00143 mo = re.search(r"id = '(http.*)'",line) 00144 if mo: grid_id = mo.group(1) 00145 f.close() 00146 self.__WriteLoggingInfoFile(grid_middleware,grid_id) 00147 if self.__gangaStatus != "aborted" \ 00148 and self.__gangaStatus != "cancelled" \ 00149 and self.__gangaStatus != "completed" \ 00150 and self.__gangaStatus != "done"\ 00151 and self.__gangaStatus != "failed"\ 00152 and self.__gangaStatus != "killed": 00153 self.__gangaStatus = "Unknown Ganga status:'" + self.__gangaStatus + "'" 00154 self.__comLevelText = self.__gangaStatus 00155 Log(self.__clientJob,logger.ERROR,self.__gangaStatus) 00156 return 00157 00158 # GRID Level:Check for GRID failures 00159 00160 self.__comLevelCode = GID_CLC_GRID 00161 if self.__gangaStatus == "aborted": 00162 self.__comLevelText = "GRID Failure: ABORTED" 00163 return 00164 if self.__gangaStatus == "cancelled"\ 00165 or self.__gangaStatus == "killed": 00166 self.__comLevelText = "GRID Failure: CANCELLED" 00167 return 00168 00169 # BATCH, WORKER, APPLICATION and USER level: Attempt to read the GBS Log File and collection information. 00170 00171 self.__comLevelCode = GID_CLC_BATCH 00172 if self.__gangaStatus == "failed": 00173 self.__comLevelText = "BATCH Failure: FAILED" 00174 return 00175 gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName() 00176 if not os.path.isfile(gbs_log_file_spec): 00177 self.__comLevelText = "Failed to find " + str(gbs_log_file_spec) 00178 return 00179 Log(self.__clientJob,logger.DEBUG,"Reading GBS Log File " + str(gbs_log_file_spec)) 00180 00181 has_wrapper_start = 0 00182 has_wrapper_term = 0 00183 first_time_string = "" 00184 last_time_string = "" 00185 app_exit_code = 0 00186 ksi2k_rating = "" 00187 f = open(gbs_log_file_spec) 00188 for line in f: 00189 mo = re.search(r"^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+(\S+)\s+(.+)\s$",line) 00190 if not mo: 00191 Log(self.__clientJob,logger.WARNING,gbs_log_file_spec + ":Ignoring malformed line: " + string.rstrip(line)) 00192 continue 00193 (date_time,keyword,data) = mo.groups() 00194 Log(self.__clientJob,logger.DEBUG," Parse GBS Log File line. date_time:" + date_time + ",keyword:" + keyword + ",data:" + data + ".") 00195 mo = re.search(r"^GBS_JOB_WRAPPER Starting",data ) 00196 if mo: 00197 has_wrapper_start = 1 00198 first_time_string = date_time 00199 Log(self.__clientJob,logger.INFO,"%s started %s "% (self.__clientJob.GetTryID(),date_time)) 00200 mo = re.search(r"^GBS_JOB_WRAPPER Terminating.*User script returned (\d+)",data ) 00201 # Collect end date time only until wrapper end. 00202 if not has_wrapper_term: last_time_string = date_time 00203 if mo: 00204 has_wrapper_term = 1 00205 app_exit_code = int(mo.group(1)) 00206 if (keyword == "SUCCEEDED"): 00207 self.__applStatusCode = GID_JSC_SUCCEEDED 00208 self.__applStatusText = data 00209 if (keyword == "FAILED"): 00210 self.__applStatusCode = GID_JSC_FAILED 00211 self.__applStatusText = data 00212 if (keyword == "HOLD"): 00213 self.__applStatusCode = GID_JSC_HELD 00214 self.__applStatusText = data 00215 if (keyword == "RETRY"): 00216 self.__applStatusCode = GID_JSC_RETRY 00217 self.__applStatusText = data 00218 mo = re.search(r"GBS_JOB_WRAPPER\s+(KSI2K=\S+)",line) 00219 if mo: ksi2k_rating = mo.group(1) 00220 mo = re.search(r"GBS_JOB_WRAPPER Resources used: CPU User: (\S+) sec, System: (\S+) sec.",line) 00221 if mo:self.__usedResources = "CPU: %7.1f min %s" %\ 00222 (((float(mo.group(1)) + float(mo.group(2)))/60.),ksi2k_rating) 00223 f.close() 00224 00225 # If we have collect two times, compute the interval 00226 if first_time_string and last_time_string: 00227 try: 00228 t_start = time.mktime(time.strptime(first_time_string,"%Y-%m-%d %H:%M:%S")) 00229 t_term = time.mktime(time.strptime(last_time_string,"%Y-%m-%d %H:%M:%S")) 00230 self.__glfTimeInterval = (t_term -t_start)/60. 00231 except ValueError: 00232 Log(self.__clientJob,logger.WARNING,"Bad times found in GLF: " + str(first_time_string) + ";"+ str(last_time_string) + ".") 00233 00234 # We now have all the information to complete data collection 00235 00236 if not has_wrapper_start: 00237 self.__comLevelText = "GBS Log File has no job wrapper start line" 00238 return 00239 00240 self.__comLevelCode = GID_CLC_WORKER 00241 if not has_wrapper_term: 00242 self.__comLevelText = "GBS Log File has no job wrapper end line" 00243 return 00244 00245 self.__comLevelCode = GID_CLC_APPLICATION 00246 if app_exit_code: 00247 self.__comLevelText = "Application returned exit code:" + str(app_exit_code) 00248 return 00249 00250 if self.__applStatusCode == GID_JSC_UNKNOWN:
def python::GBSJobAnalyser::GBSJobAnalyser::__init__ | ( | self, | ||
name, | ||||
parent, | ||||
model, | ||||
model_args | ||||
) |
def python::GBSJobAnalyser::GBSJobAnalyser::__MakeJudgement | ( | self | ) | [private] |
Make judgement on what next to do with Job but leave application to Apply() method.
Definition at line 251 of file GBSJobAnalyser.py.
00260 : 00261 00262 """Make judgement on what next to do with Job but leave application to Apply() method.""" 00263 00264 # Use current RetryArgs by default but replace if get HOLD or RETRY signal from application 00265 self.__judgeRetryArgs = self.__clientJob.GetRetryArgs() 00266 00267 if self.__comLevelCode == GID_CLC_GANGA: 00268 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00269 00270 elif self.__comLevelCode == GID_CLC_GRID: 00271 00272 if self.__gangaStatus == "aborted": 00273 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00274 else: 00275 self.__judgeStatusCode = GID_JSC_HELD 00276 self.__judgeStatusText = "Ganga returned 'cancelled' or 'killed'" 00277 return 00278 00279 elif self.__comLevelCode == GID_CLC_BATCH: 00280 00281 self.__judgeFailCategory = GID_FCC_EARLY 00282 00283 elif self.__comLevelCode == GID_CLC_WORKER \ 00284 or self.__comLevelCode == GID_CLC_APPLICATION: 00285 00286 self.__judgeFailCategory = GID_FCC_EARLY 00287 if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")): 00288 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00289 00290 elif self.__comLevelCode == GID_CLC_USER: 00291 00292 if self.__applStatusCode == GID_JSC_RETRY \ 00293 or self.__applStatusCode == GID_JSC_HELD: 00294 self.__judgeStatusText = self.__applStatusText 00295 self.__judgeRetryArgs = self.__applStatusText 00296 self.__judgeFailCategory = GID_FCC_EARLY 00297 if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")): 00298 self.__judgeFailCategory = GID_FCC_LATE_HANDLED 00299 else: 00300 self.__judgeStatusCode = self.__applStatusCode 00301 self.__judgeStatusText = self.__applStatusText 00302 self.__judgeRetryArgs = "" 00303 return 00304 00305 # If we reach this point then we have dealt with SUCCEEDED, FAILED and held 00306 # due to cancelled or killed. 00307 # All that's left are various types of failures and HOLD/RETRY. 00308 # Make decision between FAILED and HOLD/RETRY based on the error counts. 00309 00310 # Should never happen, but in case I break the logic sometime! 00311 if self.__judgeFailCategory == GID_FCC_NONE: 00312 Log(self.__clientJob,logger.ERROR,"GBSJobAnalyser: Program error: dealing with failure but category not set!") 00313 self.__judgeFailCategory = GID_FCC_EARLY # Have to pick something 00314 00315 self.__judgeStatusCode = GID_JSC_RETRY 00316 if self.__comLevelCode == GID_CLC_USER: self.__judgeStatusCode = self.__applStatusCode 00317 00318 fc = self.__judgeFailCategory 00319 max_early = int(GetConfigValue("MaxRetryEarlyFails")) 00320 max_late_handled = int(GetConfigValue("MaxRetryLateFailsHandled")) 00321 max_late_unhandled = int(GetConfigValue("MaxRetryLateFailsUnhandled")) 00322 job = self.__clientJob 00323 job_early = job.GetEarlyFailsCount() 00324 job_late_handled = job.GetLateHandledFailsCount() 00325 job_late_unhandled = job.GetLateUnhandledFailsCount() 00326 00327 if fc == GID_FCC_EARLY and job_early >= max_early: 00328 self.__judgeStatusCode = GID_JSC_FAILED 00329 self.__judgeStatusText = "EARLY fails limit of " + str(max_early) + " exceeded"
def python::GBSJobAnalyser::GBSJobAnalyser::__repr__ | ( | self | ) |
Definition at line 48 of file GBSJobAnalyser.py.
00051 : return "GBSJobAnalyser" 00052 00053 def GetRetryArgs(self): return self.__judgeRetryArgs 00054 00055 def GetJob(self): return self.__clientJob 00056 00057 def __repr__(self) : 00058 if not self.__clientJob: return "No associated client Job" 00059 return " Communication Level: " + GIDStringForCLC(self.__comLevelCode) \
def python::GBSJobAnalyser::GBSJobAnalyser::__WriteLoggingInfoFile | ( | self, | ||
grid_middleware, | ||||
grid_id | ||||
) | [private] |
For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'. Update __judgeStatusText for some classic GRID errors.
Definition at line 330 of file GBSJobAnalyser.py.
00331 : 00332 self.__judgeStatusCode = GID_JSC_FAILED 00333 self.__judgeStatusText = "LATE_HANDLED fails limit of " + str(max_late_handled) + " exceeded" 00334 00335 if fc == GID_FCC_LATE_UNHANDLED and job_late_unhandled >= max_late_unhandled: 00336 self.__judgeStatusCode = GID_JSC_FAILED 00337 self.__judgeStatusText = "LATE_UNHANDLED fails limit of " + str(max_late_unhandled) + " exceeded" 00338 00339 def __WriteLoggingInfoFile(self,grid_middleware,grid_id): 00340 """For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'. 00341 00342 Update __judgeStatusText for some classic GRID errors.""" 00343 00344 if not grid_id or not grid_middleware: return 00345 log_cmd = "glite-wms-job-logging-info -v 2 " + grid_id 00346 if grid_middleware == "EDG": log_cmd = "edg-job-get-logging-info -v 1 " + grid_id 00347 jobTryDir = self.__clientJob._GetTryOutputDir() 00348 grid_log_file_spec = jobTryDir + "/gbs_grid_info.log" 00349 inp = os.popen(log_cmd,"r") 00350 out = open(grid_log_file_spec,'w') 00351 00352 event = "" 00353 host = "" 00354 reason = "" 00355 result = "" 00356 timestamp = "" 00357 for line in inp: 00358 mo = re.search(r"Event:\s+(\S+)",line,re.IGNORECASE) 00359 if mo: event = mo.group(1) 00360 mo = re.search(r"- host\s+=\s+(.*)",line,re.IGNORECASE) 00361 if mo: host = mo.group(1) 00362 mo = re.search(r"- reason\s+=\s+(.*)",line,re.IGNORECASE) 00363 if mo: 00364 reason = mo.group(1) 00365 # Look for a few classic errors and record the first 00366 if not self.__judgeStatusText and re.search(r"(expire|maradona)",line,re.IGNORECASE): 00367 self.__judgeStatusText = "GRID error: " + reason 00368 mo = re.search(r"- (result|status_code)\s+=\s+(.*)",line,re.IGNORECASE) 00369 if mo: result = mo.group(2) 00370 mo = re.search(r"- timestamp\s+=\s+(.*)",line,re.IGNORECASE) 00371 if mo: 00372 timestamp = mo.group(1) 00373 out.write("%s %-12s %-10s %15s %s\n" % (timestamp,event,result,host,reason)) 00374 event = "" 00375 host = "" 00376 reason = "" 00377 result = "" 00378 timestamp = "" 00379 inp.close() 00380 out.close()
def python::GBSJobAnalyser::GBSJobAnalyser::Analyse | ( | self, | ||
job | ||||
) |
Analyse client job termination.
Definition at line 60 of file GBSJobAnalyser.py.
00061 : '" + self.__gangaStatus \ 00062 + "' Recorded job interval:" + str(self.__glfTimeInterval) + "mins\n" \ 00063 + " Appl. Job Status Code: " + GIDStringForJSC(self.__applStatusCode) \ 00064 + " [" + self.__applStatusText + "]\n" \ 00065 + " Failure category: " + GIDStringForFCC(self.__judgeFailCategory) + "\n" \ 00066 + " Judgement: Status Code:" + GIDStringForJSC(self.__judgeStatusCode) \ 00067 + " [" + self.__judgeStatusText + "]" + " Retry Args:'" + self.__judgeRetryArgs + "'" 00068 00069 def Analyse(self,job):
def python::GBSJobAnalyser::GBSJobAnalyser::Apply | ( | self | ) |
Apply results of analysis to client job.
Definition at line 70 of file GBSJobAnalyser.py.
00079 : 00080 00081 """Apply results of analysis to client job.""" 00082 00083 job = self.__clientJob 00084 00085 # Increment fail counts as appropriate. 00086 if self.__judgeFailCategory == GID_FCC_EARLY: job._IncrementEarlyFailsCount() 00087 if self.__judgeFailCategory == GID_FCC_LATE_HANDLED : job._IncrementLateHandledFailsCount() 00088 if self.__judgeFailCategory == GID_FCC_LATE_UNHANDLED: job._IncrementLateUnhandledFailsCount() 00089 00090 # Update job status. 00091 job._SetStatusCode(self.__judgeStatusCode) 00092 job._SetStatusText(self.__judgeStatusText) 00093 job._SetRetryArgs(self.__judgeRetryArgs) 00094 00095 Log(self.__clientJob,logger.INFO,"%s returned %s %s %s %s %s "\ 00096 % (job.GetTryID(),GIDStringForJSC(self.__judgeStatusCode),\ 00097 GIDStringForCLC(self.__comLevelCode),GIDStringForFCC(self.__judgeFailCategory),\ 00098 self.__usedResources,self.__judgeRetryArgs) ) 00099 00100 # Record the results to the GLF. jobTryDir = self.__clientJob._GetTryOutputDir()
def python::GBSJobAnalyser::GBSJobAnalyser::Clear | ( | self | ) |
Clear out existing analysis in case we ever want to use the object again.
Definition at line 20 of file GBSJobAnalyser.py.
00027 : self.Clear() 00028 00029 def Clear(self): 00030 00031 """Clear out existing analysis in case we ever want to use the object again.""" 00032 00033 self.__clientJob = None # Client Job to be analysed 00034 00035 # The following are set during data collection phase:- 00036 self.__comLevelCode = 0 # Communication level code of returned information 00037 self.__comLevelText = "" # Communication level text qualifying failure to reach USER level 00038 self.__gangaStatus = "" # Ganga exit status 00039 self.__glfTimeInterval = 0 # Time duration as measured by GBS Log File, if available 00040 self.__applStatusCode = GID_JSC_UNKNOWN # Application requested status code, if available 00041 self.__applStatusText = "" # Application requested status text, if available
def python::GBSJobAnalyser::GBSJobAnalyser::GetJob | ( | self | ) |
def python::GBSJobAnalyser::GBSJobAnalyser::GetRetryArgs | ( | self | ) |
def python::GBSJobAnalyser::GBSJobAnalyser::GetType | ( | self | ) |