LogAnalyser.py

Go to the documentation of this file.
00001 import datetime
00002 import glob
00003 import os
00004 import re
00005 import sys
00006 
00007 
00008 from GBSTimeStamp      import TimeStampToUnixTime
00009 from JobAttemptSummary import JobAttemptSummary, ParseLogEntry
00010 
00011 def LastMonthArgs():
00012     """Return tuple (start,end,html_file_name) that are suitable to feed into
00013     LogAnalyser to produce last month's statistics."""
00014 
00015     today      = datetime.date.today()
00016     end_date   = today - datetime.timedelta(today.day)
00017     start_date = end_date - datetime.timedelta(end_date.day-1)
00018     return(start_date.isoformat(),end_date.isoformat(),start_date.strftime("%b_%Y").lower()+".html")
00019 
00020 class LogAnalyser:
00021     
00022     """An object to analyse a series of directories holding GBS global
00023     log files within a date window and produce a report on overall
00024     performance, including numbers MINOS UK has to supply to GridPP as
00025     their 'GRID Metrics'.
00026 
00027     Analysis contains all the jobs submitted during the date window including
00028     those that end after the end of the window.  So to fully analyse all jobs,
00029     it may be necessary to include log files that postdate the date window.
00030     A warning will be issued for any job that is not complete unless a log
00031     file is processed that is at least 7 days later that the job start time. 
00032 
00033     Internally the analyser records information about each individual
00034     job submissions into a
00035 
00036         JobAttemptSummary (JAS)
00037 
00038     objects. A new object is created in the self.ActiveJAS pool each time a
00039     "submitted" line in the log file is encountered and recieves
00040     further log file lines for the same job attempt until it receives
00041     on that terminates it at which time it tells the LogAnalyser to move
00042     it to the self.CompletedJAS pool"""
00043 
00044     def __init__(self,start_date,end_date,directories,html_file=None,title="MINOS Grid Production"):
00045         """Initialise analyser:-
00046         
00047         start_date    Start date string e.g. "2008-01-01"
00048         end_date      End date string   e.g. "2008-04-01"
00049         directories   List of directory names e.g. ['/home/west/log_files/set1','/home/west/log_files/set2']
00050         title         Optional title for summary.
00051         html_file     Name of HTML file (if None will write plain text to STDOUT)
00052         """
00053 
00054         self.debug = 0    #Switch on/off by hand.
00055 
00056         # The maximum number of seconds between submission and job completion.  This
00057         # number is used to:-
00058         # o  Flag up possibly missing start records if only see end record more
00059         #    than limit after start record.
00060         # o  Terminate any JAS that has run that long without seeing an end record.
00061 
00062         self.max_job_duration = 10*24*60*60
00063 
00064         if start_date > end_date: (start_date,end_date) = (end_date,start_date)
00065         self.start_ts = start_date
00066         if not re.search(r'\d\d\d\d-\d\d-\d\d',start_date):
00067             print "Bad start date " + start_date
00068             return
00069         self.start_ut = TimeStampToUnixTime(start_date)
00070         if not self.start_ut: return
00071         
00072         self.end_ts = end_date
00073         if not re.search(r'\d\d\d\d-\d\d-\d\d',end_date):
00074             print "Bad end date " + end_date
00075             return
00076         self.end_ut = TimeStampToUnixTime(end_date)
00077         if not self.end_ut: return
00078 
00079         self.directories = directories
00080         for dir in self.directories:
00081             if not os.path.isdir(dir):
00082                 print 'Cannot find directory ' + dir
00083                 return
00084 
00085         self.html_file      = html_file
00086         self.title          = title
00087         self.non_jas_errors = {} # non-jas-error-description -> number of errors
00088         
00089 
00090         # Prepare 2 pools of JobAttemptSummary (JAS) objects:-
00091         #
00092         #  ActiveJAS:    hash JAS-name -> obj
00093         #  CompletedJAS: list of completed JAS (not a hash; JAS-name may be duplicated)
00094 
00095         self.ActiveJAS    = {}
00096         self.CompletedJAS = []
00097 
00098         # Perform analysis
00099         self.Run()
00100         return
00101 #_______________________________________________________________________________________________
00102 
00103     def AnalyseCollectedJAS(self):
00104         """Analyse the collection of JobAttemptSummary (JAS) objects."""
00105 
00106 
00107         # Prepare to write to file or stdout.
00108         if self.html_file:
00109             html = 1
00110             file_html = open(self.html_file,"w")
00111             tr = "<tr><td>"
00112             td = "<td>"
00113             br = "<br>"
00114             pg = "<p>"
00115         else:
00116             html = 0
00117             file_html = open("/dev/tty","w")
00118             tr = ""
00119             td = "  "
00120             br = "\n"
00121             pg = "\n\n"
00122             
00123         pr = file_html.write
00124             
00125         num_days = int( 1.5 + (self.end_ut - self.start_ut)/(24*60*60)) # Time window is inclusive
00126 
00127         title = self.title
00128         if not title: title = "GBS Global Log File Analysis"
00129         title += " for period %s to %s (%d days)\n" % (self.start_ts,self.end_ts,num_days)
00130         if html: pr("<html>\n<title>%s</title>\n<body>\n<h1>%s</h1>\n" % (title,title))
00131         else:    pr("\n\n\n%s\n" % title)
00132 
00133         pr("%sBased on log files analysed from:-%s" % (pg,br))
00134         if html: pr("<pre>\n")
00135         for dir in self.directories: pr("  %s\n" % dir)
00136         if html: pr("</pre>\n")
00137 
00138         num_tot = len(self.CompletedJAS)
00139         num_success   = 0
00140         num_p_success = 0
00141         num_fail      = 0
00142         num_new_seed  = 0
00143 
00144         num_latency   = 0
00145         sum_latency   = 0
00146 
00147         sum_cpu       = 0
00148         sum_good_cpu  = 0
00149         sum_bad_cpu   = 0
00150 
00151         sum_fail_lost = 0
00152         sum_fail_grid = 0
00153         sum_fail_user = 0
00154 
00155         for jas in self.CompletedJAS:
00156             if jas.IsSuccess():          num_success   += 1
00157             elif jas.IsPartialSuccess(): num_p_success += 1
00158             elif jas.IsFailure():        num_fail      +=1
00159             else:
00160                 print "debug job not classified"
00161                 self.IncrementNonJasErrorCount("Log analysis bug - investigate!")
00162             if jas.RequestNewSeed():     num_new_seed += 1
00163             cpu = jas.GetCpuKSI2K()
00164             if cpu:
00165                 sum_cpu  += cpu
00166                 if jas.IsFailure(): sum_bad_cpu  += cpu
00167                 else:               sum_good_cpu += cpu
00168             if jas.IsFailure():
00169                 if   jas.fail == jas.FAIL_LOST: sum_fail_lost += 1
00170                 elif jas.fail == jas.FAIL_GRID: sum_fail_grid += 1
00171                 elif jas.fail == jas.FAIL_USER: sum_fail_user += 1
00172                 else:
00173                     print "debug %s " % str(jas.fail)
00174                     self.IncrementNonJasErrorCount("Log analysis bug - investigate!")
00175              
00176             submit_ut = jas.GetSubmitDate()
00177             start_ut  = jas.GetStartDate()
00178             if submit_ut and start_ut:
00179                 num_latency += 1
00180                 sum_latency += start_ut - submit_ut
00181 
00182             num_not_success   = num_fail + num_p_success
00183             num_tot_grid_fail = sum_fail_lost + sum_fail_grid
00184             
00185         pr("\nNumber of job submissions analysed: %d (%5.1f/day)\n\n" % (num_tot,num_tot/num_days))
00186         if not num_tot: return
00187 
00188         if html: pr("<h2>GRID Metrics</h2>\n")
00189         else:    pr("\nGRID Metrics\n")
00190         if html: pr("<p><table border=1><tr><th>Metric<th>Value\n")
00191             
00192         pr("%sThroughput (1) %s%5.1f KSI2K \n"     % (tr,td,sum_good_cpu/(60.*24*num_days)))
00193         pr("%sRobustness (2) %s%5.1f%%\n"          % (tr,td,(100. - 100.*num_tot_grid_fail/num_tot)))
00194         latency = "No data"
00195         if num_latency: latency = "%5.1f mins" % (sum_latency/(num_latency*60))
00196         pr("%sLatency (3)    %s%s\n"               % (tr,td,latency))
00197         pr("%sWastage (4)    %s%5.1f%%\n"          % (tr,td,100.*sum_bad_cpu/(max(sum_cpu,.001))))
00198 
00199 
00200         if html:
00201             pr("</table>\n<p><font size=-1>Where:-\n<ol>\n")
00202             pr("<li><b>Throughput</b> The total number of useful KSI2K CPU days delivered per day\n")
00203             pr("<li><b>Robustness</b> The fraction of submitted jobs that were not GRID related failures\n")
00204             pr("<li><b>Latency</b> The mean time between submission and start of execution\n")
00205             pr("<li><b>Wastage</b> The fraction of total CPU known to be wasted on jobs that failed.\n")
00206             pr("</ol></font>\n")
00207 
00208         if num_not_success:
00209             if html: pr("<h2>Job Failure Analysis</h2>\n")
00210             else:    pr("\nJob Failure Analysis\n\n")
00211             pr("Of the %5.1f%% that were not successful (including the %5.1f%% that were GRID failures)%s" %\
00212                ( 100.*num_not_success/num_tot,100.*num_tot_grid_fail/num_tot,pg ))
00213             if html: pr("<p><table border=1><tr><th>Error<th>Rate\n")
00214 
00215             pr("%sJob failed to run or no output returned%s%5.1f%%\n" % (tr,td,100.*sum_fail_lost/num_not_success))
00216             pr("%sJob aborted (failed to exit wrapper normally)%s%5.1f%%\n" % (tr,td,100.*sum_fail_grid/num_not_success))
00217             pr("%sApplication returned non-zero exit code%s%5.1f%%\n" % (tr,td,100.*sum_fail_user/num_not_success))
00218             pr("%sApplication only partially successful%s%5.1f%%\n" % (tr,td,100.*num_p_success/num_not_success))
00219             pr("%sApplication requested new seed%s%5.1f%%\n" % (tr,td,100.*num_new_seed/num_not_success))
00220             if html: pr("</table><p>\n")
00221         
00222         if html: pr("<h2>Non-Job Failure Analysis</h2>\n<p><table border=1><tr><th>Error<th>Total<th>Rate\n")
00223         else:    pr("\nNon-Job Failure Analysis\n\n")
00224 
00225         errors = self.non_jas_errors.keys()
00226         errors.sort()
00227         for error in errors:
00228             num_errors = self.non_jas_errors[error]
00229             pr("%s%s%s%d%s%5.1f%%\n" % (tr,error,td,num_errors,td,(100.*num_errors/num_tot)))
00230         if html: pr("</table><p>\n")
00231 
00232         if not errors: pr("%s - No errors recorded%s" % (br,br))
00233  
00234         file_html.close()
00235         if html: print '\nResults written to %s' % self.html_file
00236         
00237 
00238 #_______________________________________________________________________________________________
00239 
00240     def AnalyseNonJasLine(self,line):
00241         """Analyse non-JAS line and count errors.
00242 
00243         Look at any line that is not part of a job attempt summary that is an error or fatal"""
00244         mo = re.search(r' (-\w-)\s+\d+\s+(.+)',line)
00245         if not mo:
00246             print "Cannot analyse non-JAS line %s " % line
00247             self.IncrementNonJasErrorCount("Cannot analyse non-job-attempt line")
00248             return
00249         (severity,error) = mo.groups()
00250         if severity == "-I-": return
00251         if   re.search("Removing orphaned Ganga job",line):
00252             self.IncrementNonJasErrorCount("Removing orphaned Ganga job")
00253         elif re.search("Failed to complete Ganga job submit",line):
00254             self.IncrementNonJasErrorCount("Failed to complete Ganga job submit")
00255         elif re.search("Ignoring malformed line:",line):
00256             self.IncrementNonJasErrorCount("Ignoring malformed line:")
00257         elif re.search("Timeout reached before asynchronous call ended",line):
00258             self.IncrementNonJasErrorCount("Timeout reached before asynchronous call ended")
00259         # The "Timeout reached ..." is always followed by "...Attempting to delete .." so ignore that
00260         elif re.search("\.\.\.Attempting to delete child processes ",line):
00261             pass
00262         elif re.search("Unknown Ganga status",line):
00263             self.IncrementNonJasErrorCount("Unknown Ganga status")
00264         elif re.search("GBSIOHelper: EOF on <open file",line):
00265             self.IncrementNonJasErrorCount("GBSIOHelper: EOF on <open file")
00266         else:
00267             print "Cannot handle non-JAS error: %s" % error
00268             self.IncrementNonJasErrorCount("Cannot handle non-JAS error")
00269  
00270 #_______________________________________________________________________________________________
00271 
00272     def IncrementNonJasErrorCount(self,description):
00273         """Increment non-JAS error count."""
00274         if not self.non_jas_errors.has_key(description): self.non_jas_errors[description] = 0
00275         self.non_jas_errors[description] += 1
00276 
00277 #_______________________________________________________________________________________________
00278 
00279     def ProcessDirectory(self,dir):
00280         """Process all gbs_global_*.log files in 'dir' and records results in set of JAS objects."""
00281 
00282         print "Processing directory %s ... " % dir
00283 
00284         log_files = glob.glob("%s/gbs_global_*.log" % dir)
00285         log_files.sort()
00286 
00287         for log_file in log_files:
00288 
00289             log_file_name = os.path.basename(log_file)
00290             mo = re.search(r'_(\d\d\d\d-\d\d-\d\d).log',log_file_name)
00291             if not mo:
00292                 print "  skipping %s (does not end YYYY-MM-DD.log)" % log_file_name
00293                 continue
00294             log_ts = mo.group(1)
00295 
00296             # Skip log files that predate the start date and quit once
00297             # the end date has been past when the pool of active JAS
00298             # are exhausted.
00299             
00300             if log_ts < self.start_ts:
00301                 print "  skipping %s (predates start date %s)" % (log_file_name,self.start_ts)
00302                 continue
00303             if log_ts > self.end_ts and not self.ActiveJAS:
00304                 print "  skipping remaining log files as they postdate end date %s and there are no incomplete jobs" %\
00305                       self.end_ts
00306                 return
00307 
00308             self.ProcessLogFile(log_file,log_ts <= self.end_ts)
00309 
00310             # Terminate any JAS that should have completed by now
00311             expire_ut = TimeStampToUnixTime(log_ts) - self.max_job_duration
00312             jas_list = self.ActiveJAS.keys()
00313             for jas_id in jas_list:
00314                 jas = self.ActiveJAS[jas_id]
00315                 if jas.GetSubmitDate() < expire_ut: jas.Terminate()
00316 
00317         # If there are any active JAS they will have to be terminated.
00318         if self.ActiveJAS:
00319             print "  Terminating any remaining active JAS ..."
00320             jas_list = self.ActiveJAS.keys()
00321             num_term = 0
00322             for jas_id in jas_list:
00323                 
00324                 if self.debug: print "    Terminating " + jas_id
00325                 self.ActiveJAS[jas_id].Terminate()
00326                 self.IncrementNonJasErrorCount("Missing job end line")
00327                 num_term += 1
00328             print "  ... %d terminated." % num_term
00329 
00330 
00331 #_______________________________________________________________________________________________
00332 
00333     def ProcessLogFile(self,log_file,start_new_jas):
00334         """Process log file 'log_file' and record results in set of JAS objects.
00335 
00336         Create new JAS only if 'start_new_jas' True."""
00337         
00338         log_file_name = os.path.basename(log_file)
00339         print "  processing " + log_file_name
00340         file_log = open(log_file)
00341         for log_line in file_log:
00342 
00343             parse_results = ParseLogEntry(log_line)
00344             if not parse_results:
00345                 self.AnalyseNonJasLine(log_line)
00346                 continue
00347 
00348             (jas_id,event) = parse_results[1:3]
00349 
00350             #HACK: There was a bug (fixed at the end of May 2008) recording the try_id with try number 0
00351             #HACK: when reporting "No output files".  See if any active JAS matches except for try number
00352             #HACK: and fix up line if so.
00353             if jas_id[-1] == '0':
00354                 for ji in self.ActiveJAS.keys():
00355                     if jas_id[:-1] == ji[:-1]:
00356                         log_line = re.compile('\.0 ').sub('.%s ' % ji[-1],log_line)
00357                         parse_results = ParseLogEntry(log_line)
00358                         jas_id = ji
00359                         break
00360 
00361             # "submitted" entries always start a new JAS.  Jobs that get stuck
00362             # with Ganga state "submitted" will eventually get killed by GBS
00363             # and we want to record the failed attempt.
00364             if event == "submitted" and self.ActiveJAS.has_key(jas_id):
00365                 jas = self.ActiveJAS[jas_id]
00366                 jas.Terminate()
00367 
00368             # Ignore non "submitted" entries for non-existant JAS if within maximum
00369             # job lifetime of the start window - they are probably jobs that were already
00370             # running before the window opened.
00371             if         event != "submitted" \
00372                    and not self.ActiveJAS.has_key(jas_id) \
00373                    and (parse_results[0] - self.max_job_duration) < self.start_ut: continue
00374                 
00375             if self.ActiveJAS.has_key(jas_id): self.ActiveJAS[jas_id].Update(log_line)
00376             elif start_new_jas: self.ActiveJAS[jas_id] = JobAttemptSummary(self,log_line)
00377         
00378 #_______________________________________________________________________________________________
00379 
00380     def Run(self):
00381         """Perform analysis."""
00382 
00383         for dir in self.directories:
00384             self.ProcessDirectory(dir)
00385             print "Collected %d job summaries so far" % len(self.CompletedJAS)
00386 
00387         self.AnalyseCollectedJAS()
00388         
00389         
00390 
00391         
00392 
00393         
00394 
00395         

Generated on Fri Mar 5 09:25:41 2010 for gbs by  doxygen 1.4.7