00001 import os
00002 import re
00003 import string
00004 import time
00005
00006 from GBSConfig import GetConfigValue
00007 from GBSIdCodes import *
00008 from GBSLogger import Log, logger
00009 from GBSTimeStamp import GBSTimeStamp as timestamp
00010
00011 class GBSJobAnalyser :
00012
00013 """Job termination analysis object. Based on output returned and past history decides what to do next.
00014
00015 Unlike other role-playing objects this does not inherit from
00016 GBSObject as it has no persistable state; it is called into
00017 existance by a Job to analyse the results of a Ganga job
00018 and then discarded.
00019
00020 It has no user callable methods
00021
00022 """
00023
00024
00025
00026
00027 def __init__(self,name,parent,model,model_args): self.Clear()
00028
00029 def Clear(self):
00030
00031 """Clear out existing analysis in case we ever want to use the object again."""
00032
00033 self.__clientJob = None
00034
00035
00036 self.__comLevelCode = 0
00037 self.__comLevelText = ""
00038 self.__gangaStatus = ""
00039 self.__glfTimeInterval = 0
00040 self.__applStatusCode = GID_JSC_UNKNOWN
00041 self.__applStatusText = ""
00042 self.__usedResources = ""
00043
00044
00045 self.__judgeStatusCode = 0
00046 self.__judgeStatusText = ""
00047 self.__judgeRetryArgs = ""
00048 self.__judgeFailCategory = 0
00049
00050
00051 def GetType(self): return "GBSJobAnalyser"
00052
00053 def GetRetryArgs(self): return self.__judgeRetryArgs
00054
00055 def GetJob(self): return self.__clientJob
00056
00057 def __repr__(self) :
00058 if not self.__clientJob: return "No associated client Job"
00059 return " Communication Level: " + GIDStringForCLC(self.__comLevelCode) \
00060 + " [" + str(self.__comLevelText) + "]\n" \
00061 + " Ganga Exit Status: '" + self.__gangaStatus \
00062 + "' Recorded job interval:" + str(self.__glfTimeInterval) + "mins\n" \
00063 + " Appl. Job Status Code: " + GIDStringForJSC(self.__applStatusCode) \
00064 + " [" + self.__applStatusText + "]\n" \
00065 + " Failure category: " + GIDStringForFCC(self.__judgeFailCategory) + "\n" \
00066 + " Judgement: Status Code:" + GIDStringForJSC(self.__judgeStatusCode) \
00067 + " [" + self.__judgeStatusText + "]" + " Retry Args:'" + self.__judgeRetryArgs + "'"
00068
00069 def Analyse(self,job):
00070
00071 """Analyse client job termination."""
00072
00073 self.Clear()
00074 self.__clientJob = job
00075
00076 self.__CollectData()
00077 self.__MakeJudgement()
00078
00079 def Apply(self):
00080
00081 """Apply results of analysis to client job."""
00082
00083 job = self.__clientJob
00084
00085
00086 if self.__judgeFailCategory == GID_FCC_EARLY: job._IncrementEarlyFailsCount()
00087 if self.__judgeFailCategory == GID_FCC_LATE_HANDLED : job._IncrementLateHandledFailsCount()
00088 if self.__judgeFailCategory == GID_FCC_LATE_UNHANDLED: job._IncrementLateUnhandledFailsCount()
00089
00090
00091 job._SetStatusCode(self.__judgeStatusCode)
00092 job._SetStatusText(self.__judgeStatusText)
00093 job._SetRetryArgs(self.__judgeRetryArgs)
00094
00095 Log(self.__clientJob,logger.INFO,"%s returned %s %s %s %s %s "\
00096 % (job.GetTryID(),GIDStringForJSC(self.__judgeStatusCode),\
00097 GIDStringForCLC(self.__comLevelCode),GIDStringForFCC(self.__judgeFailCategory),\
00098 self.__usedResources,self.__judgeRetryArgs) )
00099
00100
00101 jobTryDir = self.__clientJob._GetTryOutputDir()
00102 gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName()
00103 if not os.path.isfile(gbs_log_file_spec):os.system("echo " + timestamp() + \
00104 " INFO GBS_JOB_ANALYSIS Unable to find file, recreating it. > " + gbs_log_file_spec)
00105 f = open(gbs_log_file_spec,'a')
00106 f.write(timestamp() + "INFO GBS_JOB_ANALYSIS:-\n")
00107 f.write(str(self) + "\n")
00108 f.close()
00109
00110
00111
00112
00113 def __CollectData(self):
00114
00115 """Colect information about the client job termination."""
00116
00117 self.__judgeFailCategory = GID_FCC_NONE
00118 self.__judgeStatusCode = GID_JSC_WAITING_ANALYSIS
00119 self.__judgeStatusText = ""
00120
00121 jobTryDir = self.__clientJob._GetTryOutputDir()
00122
00123
00124
00125 self.__comLevelCode = GID_CLC_GANGA
00126 self.__gangaStatus = "Cannot obtain Ganga exit code"
00127 gangaStatusFile = jobTryDir + "/gbs_ganga.status"
00128 if not os.path.isfile(gangaStatusFile):
00129 self.__comLevelText = "Failed to find Ganga status file" + str(gangaStatusFile)
00130 Log(self.__clientJob,logger.ERROR,self.__comLevelText)
00131 return
00132 Log(self.__clientJob,logger.DEBUG,"Reading Ganga status file" + str(gangaStatusFile))
00133 self.__gangaStatus = ""
00134 grid_middleware = ""
00135 grid_id = ""
00136 f = open(gangaStatusFile)
00137 for line in f:
00138
00139 mo = re.search(r"status = '(.*)'",line)
00140 if mo and not self.__gangaStatus: self.__gangaStatus = mo.group(1)
00141 mo = re.search(r"middleware = '(.*)'",line)
00142 if mo: grid_middleware = mo.group(1)
00143 mo = re.search(r"id = '(http.*)'",line)
00144 if mo: grid_id = mo.group(1)
00145 f.close()
00146 self.__WriteLoggingInfoFile(grid_middleware,grid_id)
00147 if self.__gangaStatus != "aborted" \
00148 and self.__gangaStatus != "cancelled" \
00149 and self.__gangaStatus != "completed" \
00150 and self.__gangaStatus != "done"\
00151 and self.__gangaStatus != "failed"\
00152 and self.__gangaStatus != "killed":
00153 self.__gangaStatus = "Unknown Ganga status:'" + self.__gangaStatus + "'"
00154 self.__comLevelText = self.__gangaStatus
00155 Log(self.__clientJob,logger.ERROR,self.__gangaStatus)
00156 return
00157
00158
00159
00160 self.__comLevelCode = GID_CLC_GRID
00161 if self.__gangaStatus == "aborted":
00162 self.__comLevelText = "GRID Failure: ABORTED"
00163 return
00164 if self.__gangaStatus == "cancelled"\
00165 or self.__gangaStatus == "killed":
00166 self.__comLevelText = "GRID Failure: CANCELLED"
00167 return
00168
00169
00170
00171 self.__comLevelCode = GID_CLC_BATCH
00172 if self.__gangaStatus == "failed":
00173 self.__comLevelText = "BATCH Failure: FAILED"
00174 return
00175 gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName()
00176 if not os.path.isfile(gbs_log_file_spec):
00177 self.__comLevelText = "Failed to find " + str(gbs_log_file_spec)
00178 return
00179 Log(self.__clientJob,logger.DEBUG,"Reading GBS Log File " + str(gbs_log_file_spec))
00180
00181 has_wrapper_start = 0
00182 has_wrapper_term = 0
00183 first_time_string = ""
00184 last_time_string = ""
00185 app_exit_code = 0
00186 ksi2k_rating = ""
00187 f = open(gbs_log_file_spec)
00188 for line in f:
00189 mo = re.search(r"^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+(\S+)\s+(.+)\s$",line)
00190 if not mo:
00191 Log(self.__clientJob,logger.WARNING,gbs_log_file_spec + ":Ignoring malformed line: " + string.rstrip(line))
00192 continue
00193 (date_time,keyword,data) = mo.groups()
00194 Log(self.__clientJob,logger.DEBUG," Parse GBS Log File line. date_time:" + date_time + ",keyword:" + keyword + ",data:" + data + ".")
00195 mo = re.search(r"^GBS_JOB_WRAPPER Starting",data )
00196 if mo:
00197 has_wrapper_start = 1
00198 first_time_string = date_time
00199 Log(self.__clientJob,logger.INFO,"%s started %s "% (self.__clientJob.GetTryID(),date_time))
00200 mo = re.search(r"^GBS_JOB_WRAPPER Terminating.*User script returned (\d+)",data )
00201
00202 if not has_wrapper_term: last_time_string = date_time
00203 if mo:
00204 has_wrapper_term = 1
00205 app_exit_code = int(mo.group(1))
00206 if (keyword == "SUCCEEDED"):
00207 self.__applStatusCode = GID_JSC_SUCCEEDED
00208 self.__applStatusText = data
00209 if (keyword == "FAILED"):
00210 self.__applStatusCode = GID_JSC_FAILED
00211 self.__applStatusText = data
00212 if (keyword == "HOLD"):
00213 self.__applStatusCode = GID_JSC_HELD
00214 self.__applStatusText = data
00215 if (keyword == "RETRY"):
00216 self.__applStatusCode = GID_JSC_RETRY
00217 self.__applStatusText = data
00218 mo = re.search(r"GBS_JOB_WRAPPER\s+(KSI2K=\S+)",line)
00219 if mo: ksi2k_rating = mo.group(1)
00220 mo = re.search(r"GBS_JOB_WRAPPER Resources used: CPU User: (\S+) sec, System: (\S+) sec.",line)
00221 if mo:self.__usedResources = "CPU: %7.1f min %s" %\
00222 (((float(mo.group(1)) + float(mo.group(2)))/60.),ksi2k_rating)
00223 f.close()
00224
00225
00226 if first_time_string and last_time_string:
00227 try:
00228 t_start = time.mktime(time.strptime(first_time_string,"%Y-%m-%d %H:%M:%S"))
00229 t_term = time.mktime(time.strptime(last_time_string,"%Y-%m-%d %H:%M:%S"))
00230 self.__glfTimeInterval = (t_term -t_start)/60.
00231 except ValueError:
00232 Log(self.__clientJob,logger.WARNING,"Bad times found in GLF: " + str(first_time_string) + ";"+ str(last_time_string) + ".")
00233
00234
00235
00236 if not has_wrapper_start:
00237 self.__comLevelText = "GBS Log File has no job wrapper start line"
00238 return
00239
00240 self.__comLevelCode = GID_CLC_WORKER
00241 if not has_wrapper_term:
00242 self.__comLevelText = "GBS Log File has no job wrapper end line"
00243 return
00244
00245 self.__comLevelCode = GID_CLC_APPLICATION
00246 if app_exit_code:
00247 self.__comLevelText = "Application returned exit code:" + str(app_exit_code)
00248 return
00249
00250 if self.__applStatusCode == GID_JSC_UNKNOWN:
00251 self.__comLevelText = "Application failed to record SUCCEEDED, FAILED, HOLD or RETRY"
00252 return
00253
00254
00255
00256 self.__comLevelCode = GID_CLC_USER
00257 self.__comLevelText = "Achieved communication with application"
00258 return
00259
00260 def __MakeJudgement(self):
00261
00262 """Make judgement on what next to do with Job but leave application to Apply() method."""
00263
00264
00265 self.__judgeRetryArgs = self.__clientJob.GetRetryArgs()
00266
00267 if self.__comLevelCode == GID_CLC_GANGA:
00268 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00269
00270 elif self.__comLevelCode == GID_CLC_GRID:
00271
00272 if self.__gangaStatus == "aborted":
00273 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00274 else:
00275 self.__judgeStatusCode = GID_JSC_HELD
00276 self.__judgeStatusText = "Ganga returned 'cancelled' or 'killed'"
00277 return
00278
00279 elif self.__comLevelCode == GID_CLC_BATCH:
00280
00281 self.__judgeFailCategory = GID_FCC_EARLY
00282
00283 elif self.__comLevelCode == GID_CLC_WORKER \
00284 or self.__comLevelCode == GID_CLC_APPLICATION:
00285
00286 self.__judgeFailCategory = GID_FCC_EARLY
00287 if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")):
00288 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00289
00290 elif self.__comLevelCode == GID_CLC_USER:
00291
00292 if self.__applStatusCode == GID_JSC_RETRY \
00293 or self.__applStatusCode == GID_JSC_HELD:
00294 self.__judgeStatusText = self.__applStatusText
00295 self.__judgeRetryArgs = self.__applStatusText
00296 self.__judgeFailCategory = GID_FCC_EARLY
00297 if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")):
00298 self.__judgeFailCategory = GID_FCC_LATE_HANDLED
00299 else:
00300 self.__judgeStatusCode = self.__applStatusCode
00301 self.__judgeStatusText = self.__applStatusText
00302 self.__judgeRetryArgs = ""
00303 return
00304
00305
00306
00307
00308
00309
00310
00311 if self.__judgeFailCategory == GID_FCC_NONE:
00312 Log(self.__clientJob,logger.ERROR,"GBSJobAnalyser: Program error: dealing with failure but category not set!")
00313 self.__judgeFailCategory = GID_FCC_EARLY
00314
00315 self.__judgeStatusCode = GID_JSC_RETRY
00316 if self.__comLevelCode == GID_CLC_USER: self.__judgeStatusCode = self.__applStatusCode
00317
00318 fc = self.__judgeFailCategory
00319 max_early = int(GetConfigValue("MaxRetryEarlyFails"))
00320 max_late_handled = int(GetConfigValue("MaxRetryLateFailsHandled"))
00321 max_late_unhandled = int(GetConfigValue("MaxRetryLateFailsUnhandled"))
00322 job = self.__clientJob
00323 job_early = job.GetEarlyFailsCount()
00324 job_late_handled = job.GetLateHandledFailsCount()
00325 job_late_unhandled = job.GetLateUnhandledFailsCount()
00326
00327 if fc == GID_FCC_EARLY and job_early >= max_early:
00328 self.__judgeStatusCode = GID_JSC_FAILED
00329 self.__judgeStatusText = "EARLY fails limit of " + str(max_early) + " exceeded"
00330
00331 if fc == GID_FCC_LATE_HANDLED and job_late_handled >= max_late_handled:
00332 self.__judgeStatusCode = GID_JSC_FAILED
00333 self.__judgeStatusText = "LATE_HANDLED fails limit of " + str(max_late_handled) + " exceeded"
00334
00335 if fc == GID_FCC_LATE_UNHANDLED and job_late_unhandled >= max_late_unhandled:
00336 self.__judgeStatusCode = GID_JSC_FAILED
00337 self.__judgeStatusText = "LATE_UNHANDLED fails limit of " + str(max_late_unhandled) + " exceeded"
00338
00339 def __WriteLoggingInfoFile(self,grid_middleware,grid_id):
00340 """For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'.
00341
00342 Update __judgeStatusText for some classic GRID errors."""
00343
00344 if not grid_id or not grid_middleware: return
00345 log_cmd = "glite-wms-job-logging-info -v 2 " + grid_id
00346 if grid_middleware == "EDG": log_cmd = "edg-job-get-logging-info -v 1 " + grid_id
00347 jobTryDir = self.__clientJob._GetTryOutputDir()
00348 grid_log_file_spec = jobTryDir + "/gbs_grid_info.log"
00349 inp = os.popen(log_cmd,"r")
00350 out = open(grid_log_file_spec,'w')
00351
00352 event = ""
00353 host = ""
00354 reason = ""
00355 result = ""
00356 timestamp = ""
00357 for line in inp:
00358 mo = re.search(r"Event:\s+(\S+)",line,re.IGNORECASE)
00359 if mo: event = mo.group(1)
00360 mo = re.search(r"- host\s+=\s+(.*)",line,re.IGNORECASE)
00361 if mo: host = mo.group(1)
00362 mo = re.search(r"- reason\s+=\s+(.*)",line,re.IGNORECASE)
00363 if mo:
00364 reason = mo.group(1)
00365
00366 if not self.__judgeStatusText and re.search(r"(expire|maradona)",line,re.IGNORECASE):
00367 self.__judgeStatusText = "GRID error: " + reason
00368 mo = re.search(r"- (result|status_code)\s+=\s+(.*)",line,re.IGNORECASE)
00369 if mo: result = mo.group(2)
00370 mo = re.search(r"- timestamp\s+=\s+(.*)",line,re.IGNORECASE)
00371 if mo:
00372 timestamp = mo.group(1)
00373 out.write("%s %-12s %-10s %15s %s\n" % (timestamp,event,result,host,reason))
00374 event = ""
00375 host = ""
00376 reason = ""
00377 result = ""
00378 timestamp = ""
00379 inp.close()
00380 out.close()
00381
00382
00383
00384
00385
00386
00387