00001 import datetime
00002 import glob
00003 import os
00004 import re
00005 import sys
00006
00007
00008 from GBSTimeStamp import TimeStampToUnixTime
00009 from JobAttemptSummary import JobAttemptSummary, ParseLogEntry
00010
00011 def LastMonthArgs():
00012 """Return tuple (start,end,html_file_name) that are suitable to feed into
00013 LogAnalyser to produce last month's statistics."""
00014
00015 today = datetime.date.today()
00016 end_date = today - datetime.timedelta(today.day)
00017 start_date = end_date - datetime.timedelta(end_date.day-1)
00018 return(start_date.isoformat(),end_date.isoformat(),start_date.strftime("%b_%Y").lower()+".html")
00019
00020 class LogAnalyser:
00021
00022 """An object to analyse a series of directories holding GBS global
00023 log files within a date window and produce a report on overall
00024 performance, including numbers MINOS UK has to supply to GridPP as
00025 their 'GRID Metrics'.
00026
00027 Analysis contains all the jobs submitted during the date window including
00028 those that end after the end of the window. So to fully analyse all jobs,
00029 it may be necessary to include log files that postdate the date window.
00030 A warning will be issued for any job that is not complete unless a log
00031 file is processed that is at least 7 days later that the job start time.
00032
00033 Internally the analyser records information about each individual
00034 job submissions into a
00035
00036 JobAttemptSummary (JAS)
00037
00038 objects. A new object is created in the self.ActiveJAS pool each time a
00039 "submitted" line in the log file is encountered and recieves
00040 further log file lines for the same job attempt until it receives
00041 on that terminates it at which time it tells the LogAnalyser to move
00042 it to the self.CompletedJAS pool"""
00043
00044 def __init__(self,start_date,end_date,directories,html_file=None,title="MINOS Grid Production"):
00045 """Initialise analyser:-
00046
00047 start_date Start date string e.g. "2008-01-01"
00048 end_date End date string e.g. "2008-04-01"
00049 directories List of directory names e.g. ['/home/west/log_files/set1','/home/west/log_files/set2']
00050 title Optional title for summary.
00051 html_file Name of HTML file (if None will write plain text to STDOUT)
00052 """
00053
00054 self.debug = 0
00055
00056
00057
00058
00059
00060
00061
00062 self.max_job_duration = 10*24*60*60
00063
00064 if start_date > end_date: (start_date,end_date) = (end_date,start_date)
00065 self.start_ts = start_date
00066 if not re.search(r'\d\d\d\d-\d\d-\d\d',start_date):
00067 print "Bad start date " + start_date
00068 return
00069 self.start_ut = TimeStampToUnixTime(start_date)
00070 if not self.start_ut: return
00071
00072 self.end_ts = end_date
00073 if not re.search(r'\d\d\d\d-\d\d-\d\d',end_date):
00074 print "Bad end date " + end_date
00075 return
00076 self.end_ut = TimeStampToUnixTime(end_date)
00077 if not self.end_ut: return
00078
00079 self.directories = directories
00080 for dir in self.directories:
00081 if not os.path.isdir(dir):
00082 print 'Cannot find directory ' + dir
00083 return
00084
00085 self.html_file = html_file
00086 self.title = title
00087 self.non_jas_errors = {}
00088
00089
00090
00091
00092
00093
00094
00095 self.ActiveJAS = {}
00096 self.CompletedJAS = []
00097
00098
00099 self.Run()
00100 return
00101
00102
00103 def AnalyseCollectedJAS(self):
00104 """Analyse the collection of JobAttemptSummary (JAS) objects."""
00105
00106
00107
00108 if self.html_file:
00109 html = 1
00110 file_html = open(self.html_file,"w")
00111 tr = "<tr><td>"
00112 td = "<td>"
00113 br = "<br>"
00114 pg = "<p>"
00115 else:
00116 html = 0
00117 file_html = open("/dev/tty","w")
00118 tr = ""
00119 td = " "
00120 br = "\n"
00121 pg = "\n\n"
00122
00123 pr = file_html.write
00124
00125 num_days = int( 1.5 + (self.end_ut - self.start_ut)/(24*60*60))
00126
00127 title = self.title
00128 if not title: title = "GBS Global Log File Analysis"
00129 title += " for period %s to %s (%d days)\n" % (self.start_ts,self.end_ts,num_days)
00130 if html: pr("<html>\n<title>%s</title>\n<body>\n<h1>%s</h1>\n" % (title,title))
00131 else: pr("\n\n\n%s\n" % title)
00132
00133 pr("%sBased on log files analysed from:-%s" % (pg,br))
00134 if html: pr("<pre>\n")
00135 for dir in self.directories: pr(" %s\n" % dir)
00136 if html: pr("</pre>\n")
00137
00138 num_tot = len(self.CompletedJAS)
00139 num_success = 0
00140 num_p_success = 0
00141 num_fail = 0
00142 num_new_seed = 0
00143
00144 num_latency = 0
00145 sum_latency = 0
00146
00147 sum_cpu = 0
00148 sum_good_cpu = 0
00149 sum_bad_cpu = 0
00150
00151 sum_fail_lost = 0
00152 sum_fail_grid = 0
00153 sum_fail_user = 0
00154
00155 for jas in self.CompletedJAS:
00156 if jas.IsSuccess(): num_success += 1
00157 elif jas.IsPartialSuccess(): num_p_success += 1
00158 elif jas.IsFailure(): num_fail +=1
00159 else:
00160 print "debug job not classified"
00161 self.IncrementNonJasErrorCount("Log analysis bug - investigate!")
00162 if jas.RequestNewSeed(): num_new_seed += 1
00163 cpu = jas.GetCpuKSI2K()
00164 if cpu:
00165 sum_cpu += cpu
00166 if jas.IsFailure(): sum_bad_cpu += cpu
00167 else: sum_good_cpu += cpu
00168 if jas.IsFailure():
00169 if jas.fail == jas.FAIL_LOST: sum_fail_lost += 1
00170 elif jas.fail == jas.FAIL_GRID: sum_fail_grid += 1
00171 elif jas.fail == jas.FAIL_USER: sum_fail_user += 1
00172 else:
00173 print "debug %s " % str(jas.fail)
00174 self.IncrementNonJasErrorCount("Log analysis bug - investigate!")
00175
00176 submit_ut = jas.GetSubmitDate()
00177 start_ut = jas.GetStartDate()
00178 if submit_ut and start_ut:
00179 num_latency += 1
00180 sum_latency += start_ut - submit_ut
00181
00182 num_not_success = num_fail + num_p_success
00183 num_tot_grid_fail = sum_fail_lost + sum_fail_grid
00184
00185 pr("\nNumber of job submissions analysed: %d (%5.1f/day)\n\n" % (num_tot,num_tot/num_days))
00186 if not num_tot: return
00187
00188 if html: pr("<h2>GRID Metrics</h2>\n")
00189 else: pr("\nGRID Metrics\n")
00190 if html: pr("<p><table border=1><tr><th>Metric<th>Value\n")
00191
00192 pr("%sThroughput (1) %s%5.1f KSI2K \n" % (tr,td,sum_good_cpu/(60.*24*num_days)))
00193 pr("%sRobustness (2) %s%5.1f%%\n" % (tr,td,(100. - 100.*num_tot_grid_fail/num_tot)))
00194 latency = "No data"
00195 if num_latency: latency = "%5.1f mins" % (sum_latency/(num_latency*60))
00196 pr("%sLatency (3) %s%s\n" % (tr,td,latency))
00197 pr("%sWastage (4) %s%5.1f%%\n" % (tr,td,100.*sum_bad_cpu/(max(sum_cpu,.001))))
00198
00199
00200 if html:
00201 pr("</table>\n<p><font size=-1>Where:-\n<ol>\n")
00202 pr("<li><b>Throughput</b> The total number of useful KSI2K CPU days delivered per day\n")
00203 pr("<li><b>Robustness</b> The fraction of submitted jobs that were not GRID related failures\n")
00204 pr("<li><b>Latency</b> The mean time between submission and start of execution\n")
00205 pr("<li><b>Wastage</b> The fraction of total CPU known to be wasted on jobs that failed.\n")
00206 pr("</ol></font>\n")
00207
00208 if num_not_success:
00209 if html: pr("<h2>Job Failure Analysis</h2>\n")
00210 else: pr("\nJob Failure Analysis\n\n")
00211 pr("Of the %5.1f%% that were not successful (including the %5.1f%% that were GRID failures)%s" %\
00212 ( 100.*num_not_success/num_tot,100.*num_tot_grid_fail/num_tot,pg ))
00213 if html: pr("<p><table border=1><tr><th>Error<th>Rate\n")
00214
00215 pr("%sJob failed to run or no output returned%s%5.1f%%\n" % (tr,td,100.*sum_fail_lost/num_not_success))
00216 pr("%sJob aborted (failed to exit wrapper normally)%s%5.1f%%\n" % (tr,td,100.*sum_fail_grid/num_not_success))
00217 pr("%sApplication returned non-zero exit code%s%5.1f%%\n" % (tr,td,100.*sum_fail_user/num_not_success))
00218 pr("%sApplication only partially successful%s%5.1f%%\n" % (tr,td,100.*num_p_success/num_not_success))
00219 pr("%sApplication requested new seed%s%5.1f%%\n" % (tr,td,100.*num_new_seed/num_not_success))
00220 if html: pr("</table><p>\n")
00221
00222 if html: pr("<h2>Non-Job Failure Analysis</h2>\n<p><table border=1><tr><th>Error<th>Total<th>Rate\n")
00223 else: pr("\nNon-Job Failure Analysis\n\n")
00224
00225 errors = self.non_jas_errors.keys()
00226 errors.sort()
00227 for error in errors:
00228 num_errors = self.non_jas_errors[error]
00229 pr("%s%s%s%d%s%5.1f%%\n" % (tr,error,td,num_errors,td,(100.*num_errors/num_tot)))
00230 if html: pr("</table><p>\n")
00231
00232 if not errors: pr("%s - No errors recorded%s" % (br,br))
00233
00234 file_html.close()
00235 if html: print '\nResults written to %s' % self.html_file
00236
00237
00238
00239
00240 def AnalyseNonJasLine(self,line):
00241 """Analyse non-JAS line and count errors.
00242
00243 Look at any line that is not part of a job attempt summary that is an error or fatal"""
00244 mo = re.search(r' (-\w-)\s+\d+\s+(.+)',line)
00245 if not mo:
00246 print "Cannot analyse non-JAS line %s " % line
00247 self.IncrementNonJasErrorCount("Cannot analyse non-job-attempt line")
00248 return
00249 (severity,error) = mo.groups()
00250 if severity == "-I-": return
00251 if re.search("Removing orphaned Ganga job",line):
00252 self.IncrementNonJasErrorCount("Removing orphaned Ganga job")
00253 elif re.search("Failed to complete Ganga job submit",line):
00254 self.IncrementNonJasErrorCount("Failed to complete Ganga job submit")
00255 elif re.search("Ignoring malformed line:",line):
00256 self.IncrementNonJasErrorCount("Ignoring malformed line:")
00257 elif re.search("Timeout reached before asynchronous call ended",line):
00258 self.IncrementNonJasErrorCount("Timeout reached before asynchronous call ended")
00259
00260 elif re.search("\.\.\.Attempting to delete child processes ",line):
00261 pass
00262 elif re.search("Unknown Ganga status",line):
00263 self.IncrementNonJasErrorCount("Unknown Ganga status")
00264 elif re.search("GBSIOHelper: EOF on <open file",line):
00265 self.IncrementNonJasErrorCount("GBSIOHelper: EOF on <open file")
00266 else:
00267 print "Cannot handle non-JAS error: %s" % error
00268 self.IncrementNonJasErrorCount("Cannot handle non-JAS error")
00269
00270
00271
00272 def IncrementNonJasErrorCount(self,description):
00273 """Increment non-JAS error count."""
00274 if not self.non_jas_errors.has_key(description): self.non_jas_errors[description] = 0
00275 self.non_jas_errors[description] += 1
00276
00277
00278
00279 def ProcessDirectory(self,dir):
00280 """Process all gbs_global_*.log files in 'dir' and records results in set of JAS objects."""
00281
00282 print "Processing directory %s ... " % dir
00283
00284 log_files = glob.glob("%s/gbs_global_*.log" % dir)
00285 log_files.sort()
00286
00287 for log_file in log_files:
00288
00289 log_file_name = os.path.basename(log_file)
00290 mo = re.search(r'_(\d\d\d\d-\d\d-\d\d).log',log_file_name)
00291 if not mo:
00292 print " skipping %s (does not end YYYY-MM-DD.log)" % log_file_name
00293 continue
00294 log_ts = mo.group(1)
00295
00296
00297
00298
00299
00300 if log_ts < self.start_ts:
00301 print " skipping %s (predates start date %s)" % (log_file_name,self.start_ts)
00302 continue
00303 if log_ts > self.end_ts and not self.ActiveJAS:
00304 print " skipping remaining log files as they postdate end date %s and there are no incomplete jobs" %\
00305 self.end_ts
00306 return
00307
00308 self.ProcessLogFile(log_file,log_ts <= self.end_ts)
00309
00310
00311 expire_ut = TimeStampToUnixTime(log_ts) - self.max_job_duration
00312 jas_list = self.ActiveJAS.keys()
00313 for jas_id in jas_list:
00314 jas = self.ActiveJAS[jas_id]
00315 if jas.GetSubmitDate() < expire_ut: jas.Terminate()
00316
00317
00318 if self.ActiveJAS:
00319 print " Terminating any remaining active JAS ..."
00320 jas_list = self.ActiveJAS.keys()
00321 num_term = 0
00322 for jas_id in jas_list:
00323
00324 if self.debug: print " Terminating " + jas_id
00325 self.ActiveJAS[jas_id].Terminate()
00326 self.IncrementNonJasErrorCount("Missing job end line")
00327 num_term += 1
00328 print " ... %d terminated." % num_term
00329
00330
00331
00332
00333 def ProcessLogFile(self,log_file,start_new_jas):
00334 """Process log file 'log_file' and record results in set of JAS objects.
00335
00336 Create new JAS only if 'start_new_jas' True."""
00337
00338 log_file_name = os.path.basename(log_file)
00339 print " processing " + log_file_name
00340 file_log = open(log_file)
00341 for log_line in file_log:
00342
00343 parse_results = ParseLogEntry(log_line)
00344 if not parse_results:
00345 self.AnalyseNonJasLine(log_line)
00346 continue
00347
00348 (jas_id,event) = parse_results[1:3]
00349
00350
00351
00352
00353 if jas_id[-1] == '0':
00354 for ji in self.ActiveJAS.keys():
00355 if jas_id[:-1] == ji[:-1]:
00356 log_line = re.compile('\.0 ').sub('.%s ' % ji[-1],log_line)
00357 parse_results = ParseLogEntry(log_line)
00358 jas_id = ji
00359 break
00360
00361
00362
00363
00364 if event == "submitted" and self.ActiveJAS.has_key(jas_id):
00365 jas = self.ActiveJAS[jas_id]
00366 jas.Terminate()
00367
00368
00369
00370
00371 if event != "submitted" \
00372 and not self.ActiveJAS.has_key(jas_id) \
00373 and (parse_results[0] - self.max_job_duration) < self.start_ut: continue
00374
00375 if self.ActiveJAS.has_key(jas_id): self.ActiveJAS[jas_id].Update(log_line)
00376 elif start_new_jas: self.ActiveJAS[jas_id] = JobAttemptSummary(self,log_line)
00377
00378
00379
00380 def Run(self):
00381 """Perform analysis."""
00382
00383 for dir in self.directories:
00384 self.ProcessDirectory(dir)
00385 print "Collected %d job summaries so far" % len(self.CompletedJAS)
00386
00387 self.AnalyseCollectedJAS()
00388
00389
00390
00391
00392
00393
00394
00395