JobAttemptSummary.py

Go to the documentation of this file.
00001 import re
00002 
00003 from GBSTimeStamp      import TimeStampToUnixTime
00004 
00005 #  This file defines:-
00006 #
00007 #  The function:  ParseLogEntry
00008 #  The class:     JobAttemptSummary
00009 #
00010 #  both are components LogAnalyser (q.v.)
00011 
00012 def ParseLogEntry(log_line):
00013     """Parse global log file line entry holding job submission information.
00014 
00015     For valid entries returns a tuple containing:-
00016 
00017       log_ut  Log file line Unix time  
00018       jas_id  Job submission entry: <task>.<job-try-id>
00019       event   Data recorded on line
00020 
00021     For invalid entries: return None.
00022     """
00023     
00024     mo = re.search(r'(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+\S+\s+\d+\s+(\S+)\s+(\S+?\d+)\s+(.+)',log_line.strip())
00025     if not mo: return None
00026     log_ut = TimeStampToUnixTime(mo.group(1))
00027     task   = mo.group(2)
00028     try_id = mo.group(3)
00029     if try_id[0:5] == "JTID:": try_id = try_id[5:]
00030     
00031     jas_id = "%s.%s" % (task,try_id)
00032     event  = mo.group(4)
00033     return (log_ut,jas_id,event)
00034 
00035 #_______________________________________________________________________________________________
00036 
00037  
00038 
00039 class JobAttemptSummary:
00040     """An object to collect all the global log information for a single job submission
00041     and present a summary of it."""
00042 
00043     def __init__(self,parent,log_line):
00044         """Initialise object using data from log line."""
00045 
00046         self.parent = parent
00047         self.debug  = parent.debug  #Propagate debug level to child.
00048 
00049         # State. Always one of:-
00050         self.STATE_ACTIVE          = 1
00051         self.STATE_SUCCESS         = 2
00052         self.STATE_PARTIAL_SUCCESS = 3
00053         self.STATE_FAILURE         = 4
00054 
00055         self.state = self.STATE_ACTIVE
00056 
00057         # Failure type.  Only defined if self.state == self.STATE_FAILURE
00058         self.FAIL_LOST  = 1
00059         self.FAIL_GRID  = 2
00060         self.FAIL_USER  = 3
00061 
00062         self.fail = None
00063 
00064         self.cpu              = None   # CPU time if known
00065         self.submit_ut        = None   # Submit Unix time
00066         self.start_ut         = None   # Start Unix time
00067         self.id               = ""     # JAS Id (see ParseLogEntry)
00068         self.request_new_seed = False  # Set True if job attept ends by requesting new seed.
00069         
00070         self.ksi2k_rating = 1.23 # Based on 700 RAL T1 node values obtained in Jun 2008.
00071         
00072         parse_results = ParseLogEntry(log_line)
00073         if not parse_results:
00074             print "Creating JAS with bad line: " + log_line
00075             self.Terminate(self.STATE_FAILURE,self.LOST)
00076             return
00077 
00078         # Process first line - which should have event == "submitted"
00079         (self.submit_ut,self.id,event) = parse_results
00080         if self.debug: print "    creating JAS %s with %s" % (self.id,event)
00081         if event == "submitted": return
00082         print "Initialising JAS %s with wrong line: %s" % (self.id,log_line)
00083         self.parent.IncrementNonJasErrorCount("Initialising job summary with wong line")
00084         self.Update(log_line)
00085     
00086 #_______________________________________________________________________________________________
00087 
00088 
00089     def GetCpu(self):        return self.cpu
00090     def GetCpuKSI2K(self):
00091         if self.cpu: return self.cpu*self.ksi2k_rating
00092         return None
00093     def GetFailType(self):   return self.fail
00094     def GetId(self):         return self.id
00095     def GetStartDate(self):  return self.start_ut
00096     def GetSubmitDate(self): return self.submit_ut
00097     
00098 #_______________________________________________________________________________________________
00099 
00100 
00101     def IsActive(self):          return self.state == self.STATE_ACTIVE
00102     def IsFailure(self):         return self.state == self.STATE_FAILURE
00103     def IsPartialSuccess(self):  return self.state == self.STATE_PARTIAL_SUCCESS
00104     def IsSuccess(self):         return self.state == self.STATE_SUCCESS
00105     
00106 #_______________________________________________________________________________________________
00107 
00108     def RequestNewSeed(self):    return self.request_new_seed
00109 #_______________________________________________________________________________________________
00110 
00111     def Terminate(self,state=None,fail=None):
00112         """Terminate object and, if still in parent's active list, move to complete."""
00113         # If no arg supplied treat as forced termination due to end of log files or
00114         # start of replacement with same ID.
00115         if not state:
00116             state = self.STATE_FAILURE
00117             fail  = self.FAIL_LOST
00118         self.state = state
00119         self.fail  = fail
00120         if self.parent.ActiveJAS.has_key(self.id):
00121             del self.parent.ActiveJAS[self.id]
00122             self.parent.CompletedJAS.append(self)
00123             if self.debug: print "      moved JAS %s to completed" % self.id
00124             
00125 #_______________________________________________________________________________________________
00126 
00127     def Update(self,log_line):
00128         """Add log information."""
00129 
00130         ##  Make sure that line refers to this object.
00131 
00132         parse_results = ParseLogEntry(log_line)
00133         if not parse_results:
00134             print "Updating JobAttemptSummary with bad line: " + log_line
00135             self.parent.IncrementNonJasErrorCount("Cannot parse job summary line")
00136             return
00137 
00138         if not self.IsActive():
00139             print "Updating JobAttemptSummary %s, which is no longer active, with line: %s" % (self.id,log_line)
00140             self.parent.IncrementNonJasErrorCount("Job summary line for inactive job")
00141             return
00142 
00143         (submit_time,id,event) = parse_results
00144         if id != self.id:
00145             print "Updating JobAttemptSummary %s with wrong line: %s" % (self.id,log_line)
00146             self.parent.IncrementNonJasErrorCount("Job summary line for wrong job")
00147             return
00148 
00149         if self.debug: print "      updating %s with %s" % (self.id,event)
00150 
00151         ## Deal with all non terminating events.
00152         
00153         if event == "submitted":
00154             print "Unexpectedly updating existing JobAttemptSummary %s with line: %s" % (self.id,log_line)
00155             self.parent.IncrementNonJasErrorCount("Missing job start line")
00156             self.submit_time = submit_time
00157             return
00158 
00159         if re.search("started\s+\S+",event):
00160             mo = re.search("started\s+(.*)",event)
00161             self.start_ut = TimeStampToUnixTime(mo.group(1))
00162             return
00163 
00164         if    re.search("^No output files",event) \
00165            or re.search("^Failed to retrieve output",event) \
00166            or re.search("^Failed to unpack",event) \
00167            :
00168             self.fail = self.FAIL_LOST
00169             return
00170 
00171         if event == "killed by user": return
00172 
00173         ## Deal with all terminating events.
00174 
00175         if    re.search("^lost Ganga job with ID",event) \
00176            or re.search("^killed Ganga job",event):
00177             self.Terminate(self.STATE_FAILURE,self.FAIL_LOST)
00178 
00179         if re.search("^returned\s+\S+",event):
00180             mo = re.search("CPU:\s+(\S+)\s+min",event)
00181             if mo: self.cpu = float(mo.group(1))
00182             mo = re.search("KSI2K=(\S+)",event)
00183             if mo:
00184                 self.ksi2k_rating = float(mo.group(1))
00185             
00186             mo = re.search("\S+\s+(\S+)\s+(\S+)",event)
00187             (status,com_level) = mo.groups()
00188 
00189             if status == "SUCCEEDED":
00190                 self.Terminate(self.STATE_SUCCESS)
00191                 return
00192 
00193             # Treat RETRY at USER level communication which did not signal RESTART as a
00194             # partial success.
00195             if status == "RETRY" and  com_level == "USER" and not re.search("RESTART",event):
00196                 self.Terminate(self.STATE_PARTIAL_SUCCESS)
00197                 if re.search("NEW_SEED",event): self.request_new_seed = True
00198                 return
00199 
00200             # Treat all other HELD, RETRY and FAILED as failures with type based on
00201             # communication level, unless already set
00202             if status == "HELD" or status == "RETRY" or status == "FAILED":
00203                 if not self.fail:
00204                     self.fail = self.FAIL_GRID
00205                     if com_level == "USER" or com_level == "APPLICATION": self.fail = self.FAIL_USER
00206                 self.Terminate(self.STATE_FAILURE,self.fail)
00207                 return
00208 
00209             print "Unhandled JAS update : %s" % event
00210             
00211                 

Generated on Fri Mar 5 09:25:41 2010 for gbs by  doxygen 1.4.7