GBSJobAnalyser.py

Go to the documentation of this file.
00001 import os
00002 import re
00003 import string
00004 import time
00005 
00006 from GBSConfig  import GetConfigValue
00007 from GBSIdCodes import *
00008 from GBSLogger import Log, logger 
00009 from GBSTimeStamp import GBSTimeStamp as timestamp
00010 
00011 class GBSJobAnalyser :
00012 
00013     """Job termination analysis object. Based on output returned and past history decides what to do next.
00014 
00015     Unlike other role-playing objects this does not inherit from
00016     GBSObject as it has no persistable state; it is called into
00017     existance by a Job to analyse the results of a Ganga job
00018     and then discarded.
00019 
00020     It has no user callable methods
00021      
00022     """
00023 
00024     ######  Private Methods (not user callable)  ###### 
00025 
00026     
00027     def __init__(self,name,parent,model,model_args): self.Clear()
00028 
00029     def Clear(self):
00030 
00031         """Clear out existing analysis in case we ever want to use the object again."""
00032         
00033         self.__clientJob       = None  # Client Job to be analysed
00034 
00035         #  The following are set during data collection phase:-
00036         self.__comLevelCode    = 0                 # Communication level code of returned information
00037         self.__comLevelText    = ""                # Communication level text qualifying failure to reach USER level
00038         self.__gangaStatus     = ""                # Ganga exit status
00039         self.__glfTimeInterval = 0                 # Time duration as measured by GBS Log File, if available
00040         self.__applStatusCode  = GID_JSC_UNKNOWN   # Application requested status code, if available
00041         self.__applStatusText  = ""                # Application requested status text, if available
00042         self.__usedResources   = ""                # CPU used.
00043 
00044         # The following are set during judgement phase.
00045         self.__judgeStatusCode   = 0               # New value of Job statusCode (JSC)
00046         self.__judgeStatusText   = ""              # New value of Job statusText 
00047         self.__judgeRetryArgs    = ""              # New value of Job retryArgs  
00048         self.__judgeFailCategory = 0               # Failure category code (FCC)
00049         
00050 
00051     def GetType(self):          return "GBSJobAnalyser"
00052 
00053     def GetRetryArgs(self):     return self.__judgeRetryArgs
00054 
00055     def GetJob(self):           return self.__clientJob
00056     
00057     def __repr__(self) :
00058         if not self.__clientJob: return "No associated client Job"
00059         return   "    Communication Level: " + GIDStringForCLC(self.__comLevelCode) \
00060                + " [" + str(self.__comLevelText) + "]\n" \
00061                + "    Ganga Exit Status: '" + self.__gangaStatus \
00062                + "' Recorded job interval:" + str(self.__glfTimeInterval) + "mins\n" \
00063                + "    Appl. Job Status Code: " + GIDStringForJSC(self.__applStatusCode) \
00064                + " [" + self.__applStatusText + "]\n" \
00065                + "    Failure category: " +  GIDStringForFCC(self.__judgeFailCategory) + "\n" \
00066                + "    Judgement: Status Code:" + GIDStringForJSC(self.__judgeStatusCode) \
00067                + "  [" + self.__judgeStatusText + "]" + " Retry Args:'" + self.__judgeRetryArgs + "'"
00068 
00069     def Analyse(self,job):
00070 
00071         """Analyse client job termination."""
00072 
00073         self.Clear()
00074         self.__clientJob = job
00075 
00076         self.__CollectData()
00077         self.__MakeJudgement()
00078 
00079     def Apply(self):
00080 
00081         """Apply results of analysis to client job."""
00082 
00083         job = self.__clientJob
00084 
00085         # Increment fail counts as appropriate.
00086         if self.__judgeFailCategory == GID_FCC_EARLY:          job._IncrementEarlyFailsCount()
00087         if self.__judgeFailCategory == GID_FCC_LATE_HANDLED :  job._IncrementLateHandledFailsCount()
00088         if self.__judgeFailCategory == GID_FCC_LATE_UNHANDLED: job._IncrementLateUnhandledFailsCount()
00089 
00090         # Update job status.
00091         job._SetStatusCode(self.__judgeStatusCode)
00092         job._SetStatusText(self.__judgeStatusText)
00093         job._SetRetryArgs(self.__judgeRetryArgs)
00094 
00095         Log(self.__clientJob,logger.INFO,"%s returned %s %s %s %s %s "\
00096             % (job.GetTryID(),GIDStringForJSC(self.__judgeStatusCode),\
00097             GIDStringForCLC(self.__comLevelCode),GIDStringForFCC(self.__judgeFailCategory),\
00098             self.__usedResources,self.__judgeRetryArgs) )
00099             
00100         # Record the results to the GLF.
00101         jobTryDir = self.__clientJob._GetTryOutputDir()
00102         gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName()
00103         if not os.path.isfile(gbs_log_file_spec):os.system("echo " + timestamp() + \
00104             " INFO GBS_JOB_ANALYSIS Unable to find file, recreating it. > " + gbs_log_file_spec)
00105         f = open(gbs_log_file_spec,'a')
00106         f.write(timestamp() + "INFO GBS_JOB_ANALYSIS:-\n")
00107         f.write(str(self) + "\n")
00108         f.close()
00109         
00110     ######  Private Methods (not user callable)  ######
00111 
00112 
00113     def __CollectData(self):
00114 
00115         """Colect information about the client job termination."""
00116 
00117         self.__judgeFailCategory = GID_FCC_NONE
00118         self.__judgeStatusCode   = GID_JSC_WAITING_ANALYSIS
00119         self.__judgeStatusText   = ""
00120 
00121         jobTryDir = self.__clientJob._GetTryOutputDir()
00122 
00123         #  GANGA Level: Check for valid Ganga exit code
00124 
00125         self.__comLevelCode = GID_CLC_GANGA
00126         self.__gangaStatus = "Cannot obtain Ganga exit code"
00127         gangaStatusFile = jobTryDir + "/gbs_ganga.status"
00128         if not os.path.isfile(gangaStatusFile):
00129             self.__comLevelText = "Failed to find Ganga status file" + str(gangaStatusFile)
00130             Log(self.__clientJob,logger.ERROR,self.__comLevelText)
00131             return
00132         Log(self.__clientJob,logger.DEBUG,"Reading Ganga status file" + str(gangaStatusFile))
00133         self.__gangaStatus = ""
00134         grid_middleware    = ""
00135         grid_id            = ""
00136         f = open(gangaStatusFile)
00137         for line in f:
00138             # Collect the first status = ... line
00139             mo = re.search(r"status = '(.*)'",line)
00140             if mo and not self.__gangaStatus: self.__gangaStatus = mo.group(1)
00141             mo = re.search(r"middleware = '(.*)'",line)
00142             if mo: grid_middleware = mo.group(1)
00143             mo = re.search(r"id = '(http.*)'",line)
00144             if mo: grid_id = mo.group(1)
00145         f.close()
00146         self.__WriteLoggingInfoFile(grid_middleware,grid_id)
00147         if       self.__gangaStatus != "aborted" \
00148             and  self.__gangaStatus != "cancelled" \
00149             and  self.__gangaStatus != "completed" \
00150             and  self.__gangaStatus != "done"\
00151             and  self.__gangaStatus != "failed"\
00152             and  self.__gangaStatus != "killed":
00153             self.__gangaStatus  = "Unknown Ganga status:'" + self.__gangaStatus + "'"
00154             self.__comLevelText = self.__gangaStatus
00155             Log(self.__clientJob,logger.ERROR,self.__gangaStatus)
00156             return
00157 
00158         #  GRID Level:Check for GRID failures
00159 
00160         self.__comLevelCode = GID_CLC_GRID
00161         if self.__gangaStatus == "aborted":
00162             self.__comLevelText = "GRID Failure: ABORTED"
00163             return
00164         if      self.__gangaStatus == "cancelled"\
00165             or  self.__gangaStatus == "killed":
00166             self.__comLevelText = "GRID Failure: CANCELLED"
00167             return
00168 
00169         #  BATCH, WORKER, APPLICATION and USER level: Attempt to read the GBS Log File and collection information.
00170 
00171         self.__comLevelCode = GID_CLC_BATCH
00172         if  self.__gangaStatus == "failed":
00173             self.__comLevelText = "BATCH Failure: FAILED"
00174             return
00175         gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName()
00176         if not os.path.isfile(gbs_log_file_spec):
00177             self.__comLevelText = "Failed to find " + str(gbs_log_file_spec)
00178             return        
00179         Log(self.__clientJob,logger.DEBUG,"Reading GBS Log File " + str(gbs_log_file_spec))
00180 
00181         has_wrapper_start = 0
00182         has_wrapper_term  = 0
00183         first_time_string = ""
00184         last_time_string  = ""
00185         app_exit_code     = 0
00186         ksi2k_rating      = ""
00187         f = open(gbs_log_file_spec)
00188         for line in f:
00189             mo = re.search(r"^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+(\S+)\s+(.+)\s$",line)
00190             if not mo:
00191                 Log(self.__clientJob,logger.WARNING,gbs_log_file_spec + ":Ignoring malformed line: " + string.rstrip(line))
00192                 continue
00193             (date_time,keyword,data) = mo.groups()
00194             Log(self.__clientJob,logger.DEBUG,"  Parse GBS Log File line. date_time:" + date_time + ",keyword:" + keyword + ",data:" + data + ".")
00195             mo = re.search(r"^GBS_JOB_WRAPPER Starting",data )
00196             if mo:
00197                 has_wrapper_start = 1
00198                 first_time_string = date_time
00199                 Log(self.__clientJob,logger.INFO,"%s started %s "% (self.__clientJob.GetTryID(),date_time))
00200             mo = re.search(r"^GBS_JOB_WRAPPER Terminating.*User script returned (\d+)",data )
00201             # Collect end date time only until wrapper end.
00202             if not has_wrapper_term: last_time_string = date_time
00203             if mo:
00204                 has_wrapper_term  = 1
00205                 app_exit_code = int(mo.group(1))
00206             if (keyword == "SUCCEEDED"):
00207                 self.__applStatusCode = GID_JSC_SUCCEEDED
00208                 self.__applStatusText = data
00209             if (keyword == "FAILED"):
00210                 self.__applStatusCode = GID_JSC_FAILED
00211                 self.__applStatusText = data
00212             if (keyword == "HOLD"):
00213                 self.__applStatusCode = GID_JSC_HELD
00214                 self.__applStatusText = data
00215             if (keyword == "RETRY"):
00216                 self.__applStatusCode = GID_JSC_RETRY
00217                 self.__applStatusText = data
00218             mo = re.search(r"GBS_JOB_WRAPPER\s+(KSI2K=\S+)",line)
00219             if mo: ksi2k_rating = mo.group(1)
00220             mo = re.search(r"GBS_JOB_WRAPPER Resources used: CPU User: (\S+) sec, System: (\S+) sec.",line)
00221             if mo:self.__usedResources = "CPU: %7.1f min %s" %\
00222                                          (((float(mo.group(1)) + float(mo.group(2)))/60.),ksi2k_rating)
00223         f.close()
00224 
00225         # If we have collect two times, compute the interval
00226         if first_time_string and last_time_string:
00227             try:
00228                 t_start = time.mktime(time.strptime(first_time_string,"%Y-%m-%d %H:%M:%S"))
00229                 t_term  = time.mktime(time.strptime(last_time_string,"%Y-%m-%d %H:%M:%S"))
00230                 self.__glfTimeInterval = (t_term -t_start)/60.
00231             except ValueError:
00232                 Log(self.__clientJob,logger.WARNING,"Bad times found in GLF: " + str(first_time_string) + ";"+ str(last_time_string) + ".")
00233 
00234         # We now have all the information to complete data collection
00235         
00236         if not has_wrapper_start:
00237             self.__comLevelText = "GBS Log File has no job wrapper start line"
00238             return
00239 
00240         self.__comLevelCode = GID_CLC_WORKER
00241         if not has_wrapper_term:
00242             self.__comLevelText = "GBS Log File has no job wrapper end line"
00243             return
00244 
00245         self.__comLevelCode = GID_CLC_APPLICATION
00246         if app_exit_code:
00247             self.__comLevelText = "Application returned exit code:" + str(app_exit_code)
00248             return
00249 
00250         if self.__applStatusCode == GID_JSC_UNKNOWN:
00251             self.__comLevelText = "Application failed to record SUCCEEDED, FAILED, HOLD or RETRY"
00252             return
00253 
00254         # Hurrah! we have actually managed to communicate with the application.
00255 
00256         self.__comLevelCode = GID_CLC_USER
00257         self.__comLevelText = "Achieved communication with application"
00258         return
00259     
00260     def __MakeJudgement(self):
00261 
00262         """Make judgement on what next to do with Job but leave application to Apply() method."""
00263 
00264         # Use current RetryArgs by default but replace if get HOLD or RETRY signal from application
00265         self.__judgeRetryArgs    = self.__clientJob.GetRetryArgs()
00266 
00267         if self.__comLevelCode == GID_CLC_GANGA:
00268             self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00269 
00270         elif self.__comLevelCode == GID_CLC_GRID:        
00271 
00272             if self.__gangaStatus == "aborted":
00273                 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00274             else:
00275                 self.__judgeStatusCode = GID_JSC_HELD
00276                 self.__judgeStatusText = "Ganga returned 'cancelled' or 'killed'"
00277                 return
00278 
00279         elif self.__comLevelCode == GID_CLC_BATCH:
00280 
00281             self.__judgeFailCategory = GID_FCC_EARLY
00282 
00283         elif  self.__comLevelCode == GID_CLC_WORKER \
00284            or self.__comLevelCode == GID_CLC_APPLICATION:
00285 
00286             self.__judgeFailCategory = GID_FCC_EARLY
00287             if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")):
00288                 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED
00289 
00290         elif self.__comLevelCode == GID_CLC_USER:
00291             
00292             if        self.__applStatusCode == GID_JSC_RETRY \
00293                    or self.__applStatusCode == GID_JSC_HELD:
00294                 self.__judgeStatusText = self.__applStatusText
00295                 self.__judgeRetryArgs  = self.__applStatusText
00296                 self.__judgeFailCategory = GID_FCC_EARLY
00297                 if self.__glfTimeInterval > float(GetConfigValue("MaxTimeEarlyFails")):
00298                     self.__judgeFailCategory = GID_FCC_LATE_HANDLED
00299             else:
00300                 self.__judgeStatusCode = self.__applStatusCode
00301                 self.__judgeStatusText = self.__applStatusText
00302                 self.__judgeRetryArgs  = ""
00303                 return
00304 
00305         # If we reach this point then we have dealt with SUCCEEDED, FAILED and held
00306         # due to cancelled or killed.
00307         # All that's left are various types of failures and HOLD/RETRY.
00308         # Make decision between FAILED and HOLD/RETRY based on the error counts.
00309 
00310         #  Should never happen, but in case I break the logic sometime!
00311         if self.__judgeFailCategory == GID_FCC_NONE:
00312             Log(self.__clientJob,logger.ERROR,"GBSJobAnalyser: Program error: dealing with failure but category not set!")
00313             self.__judgeFailCategory = GID_FCC_EARLY  # Have to pick something
00314 
00315         self.__judgeStatusCode = GID_JSC_RETRY
00316         if self.__comLevelCode == GID_CLC_USER: self.__judgeStatusCode = self.__applStatusCode
00317 
00318         fc                 = self.__judgeFailCategory
00319         max_early          = int(GetConfigValue("MaxRetryEarlyFails"))
00320         max_late_handled   = int(GetConfigValue("MaxRetryLateFailsHandled"))
00321         max_late_unhandled = int(GetConfigValue("MaxRetryLateFailsUnhandled"))
00322         job                 = self.__clientJob
00323         job_early          = job.GetEarlyFailsCount()
00324         job_late_handled   = job.GetLateHandledFailsCount()
00325         job_late_unhandled = job.GetLateUnhandledFailsCount()
00326         
00327         if  fc == GID_FCC_EARLY and job_early >= max_early:
00328             self.__judgeStatusCode = GID_JSC_FAILED
00329             self.__judgeStatusText = "EARLY fails limit of " + str(max_early) + " exceeded"
00330 
00331         if  fc == GID_FCC_LATE_HANDLED and  job_late_handled >= max_late_handled:
00332             self.__judgeStatusCode = GID_JSC_FAILED
00333             self.__judgeStatusText = "LATE_HANDLED fails limit of " + str(max_late_handled) + " exceeded"
00334 
00335         if  fc == GID_FCC_LATE_UNHANDLED and  job_late_unhandled >= max_late_unhandled:
00336             self.__judgeStatusCode = GID_JSC_FAILED
00337             self.__judgeStatusText = "LATE_UNHANDLED fails limit of " + str(max_late_unhandled) + " exceeded"
00338         
00339     def __WriteLoggingInfoFile(self,grid_middleware,grid_id):
00340         """For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'.
00341 
00342         Update __judgeStatusText for some classic GRID errors."""
00343 
00344         if not grid_id or not grid_middleware: return
00345         log_cmd = "glite-wms-job-logging-info -v 2 " + grid_id
00346         if grid_middleware == "EDG": log_cmd = "edg-job-get-logging-info -v 1 " + grid_id
00347         jobTryDir = self.__clientJob._GetTryOutputDir()
00348         grid_log_file_spec = jobTryDir + "/gbs_grid_info.log"
00349         inp = os.popen(log_cmd,"r")
00350         out = open(grid_log_file_spec,'w')
00351         
00352         event     = ""
00353         host      = ""
00354         reason    = ""
00355         result    = ""
00356         timestamp = ""
00357         for line in inp:
00358             mo = re.search(r"Event:\s+(\S+)",line,re.IGNORECASE)
00359             if mo: event  = mo.group(1)
00360             mo = re.search(r"- host\s+=\s+(.*)",line,re.IGNORECASE)
00361             if mo: host  = mo.group(1)
00362             mo = re.search(r"- reason\s+=\s+(.*)",line,re.IGNORECASE)
00363             if mo:
00364                 reason  = mo.group(1)
00365                 # Look for a few classic errors and record the first
00366                 if not self.__judgeStatusText and re.search(r"(expire|maradona)",line,re.IGNORECASE):
00367                     self.__judgeStatusText = "GRID error: " + reason
00368             mo = re.search(r"- (result|status_code)\s+=\s+(.*)",line,re.IGNORECASE)
00369             if mo: result  = mo.group(2)
00370             mo = re.search(r"- timestamp\s+=\s+(.*)",line,re.IGNORECASE)
00371             if mo:
00372                 timestamp = mo.group(1)
00373                 out.write("%s %-12s %-10s %15s %s\n" % (timestamp,event,result,host,reason))
00374                 event     = ""
00375                 host      = ""
00376                 reason    = ""
00377                 result    = ""
00378                 timestamp = ""
00379         inp.close()
00380         out.close()
00381     
00382             
00383    
00384 
00385 
00386         
00387     

Generated on Fri Mar 5 09:25:41 2010 for gbs by  doxygen 1.4.7