00001 import re
00002
00003 from GBSTimeStamp import TimeStampToUnixTime
00004
00005
00006
00007
00008
00009
00010
00011
00012 def ParseLogEntry(log_line):
00013 """Parse global log file line entry holding job submission information.
00014
00015 For valid entries returns a tuple containing:-
00016
00017 log_ut Log file line Unix time
00018 jas_id Job submission entry: <task>.<job-try-id>
00019 event Data recorded on line
00020
00021 For invalid entries: return None.
00022 """
00023
00024 mo = re.search(r'(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+\S+\s+\d+\s+(\S+)\s+(\S+?\d+)\s+(.+)',log_line.strip())
00025 if not mo: return None
00026 log_ut = TimeStampToUnixTime(mo.group(1))
00027 task = mo.group(2)
00028 try_id = mo.group(3)
00029 if try_id[0:5] == "JTID:": try_id = try_id[5:]
00030
00031 jas_id = "%s.%s" % (task,try_id)
00032 event = mo.group(4)
00033 return (log_ut,jas_id,event)
00034
00035
00036
00037
00038
00039 class JobAttemptSummary:
00040 """An object to collect all the global log information for a single job submission
00041 and present a summary of it."""
00042
00043 def __init__(self,parent,log_line):
00044 """Initialise object using data from log line."""
00045
00046 self.parent = parent
00047 self.debug = parent.debug
00048
00049
00050 self.STATE_ACTIVE = 1
00051 self.STATE_SUCCESS = 2
00052 self.STATE_PARTIAL_SUCCESS = 3
00053 self.STATE_FAILURE = 4
00054
00055 self.state = self.STATE_ACTIVE
00056
00057
00058 self.FAIL_LOST = 1
00059 self.FAIL_GRID = 2
00060 self.FAIL_USER = 3
00061
00062 self.fail = None
00063
00064 self.cpu = None
00065 self.submit_ut = None
00066 self.start_ut = None
00067 self.id = ""
00068 self.request_new_seed = False
00069
00070 self.ksi2k_rating = 1.23
00071
00072 parse_results = ParseLogEntry(log_line)
00073 if not parse_results:
00074 print "Creating JAS with bad line: " + log_line
00075 self.Terminate(self.STATE_FAILURE,self.LOST)
00076 return
00077
00078
00079 (self.submit_ut,self.id,event) = parse_results
00080 if self.debug: print " creating JAS %s with %s" % (self.id,event)
00081 if event == "submitted": return
00082 print "Initialising JAS %s with wrong line: %s" % (self.id,log_line)
00083 self.parent.IncrementNonJasErrorCount("Initialising job summary with wong line")
00084 self.Update(log_line)
00085
00086
00087
00088
00089 def GetCpu(self): return self.cpu
00090 def GetCpuKSI2K(self):
00091 if self.cpu: return self.cpu*self.ksi2k_rating
00092 return None
00093 def GetFailType(self): return self.fail
00094 def GetId(self): return self.id
00095 def GetStartDate(self): return self.start_ut
00096 def GetSubmitDate(self): return self.submit_ut
00097
00098
00099
00100
00101 def IsActive(self): return self.state == self.STATE_ACTIVE
00102 def IsFailure(self): return self.state == self.STATE_FAILURE
00103 def IsPartialSuccess(self): return self.state == self.STATE_PARTIAL_SUCCESS
00104 def IsSuccess(self): return self.state == self.STATE_SUCCESS
00105
00106
00107
00108 def RequestNewSeed(self): return self.request_new_seed
00109
00110
00111 def Terminate(self,state=None,fail=None):
00112 """Terminate object and, if still in parent's active list, move to complete."""
00113
00114
00115 if not state:
00116 state = self.STATE_FAILURE
00117 fail = self.FAIL_LOST
00118 self.state = state
00119 self.fail = fail
00120 if self.parent.ActiveJAS.has_key(self.id):
00121 del self.parent.ActiveJAS[self.id]
00122 self.parent.CompletedJAS.append(self)
00123 if self.debug: print " moved JAS %s to completed" % self.id
00124
00125
00126
00127 def Update(self,log_line):
00128 """Add log information."""
00129
00130
00131
00132 parse_results = ParseLogEntry(log_line)
00133 if not parse_results:
00134 print "Updating JobAttemptSummary with bad line: " + log_line
00135 self.parent.IncrementNonJasErrorCount("Cannot parse job summary line")
00136 return
00137
00138 if not self.IsActive():
00139 print "Updating JobAttemptSummary %s, which is no longer active, with line: %s" % (self.id,log_line)
00140 self.parent.IncrementNonJasErrorCount("Job summary line for inactive job")
00141 return
00142
00143 (submit_time,id,event) = parse_results
00144 if id != self.id:
00145 print "Updating JobAttemptSummary %s with wrong line: %s" % (self.id,log_line)
00146 self.parent.IncrementNonJasErrorCount("Job summary line for wrong job")
00147 return
00148
00149 if self.debug: print " updating %s with %s" % (self.id,event)
00150
00151
00152
00153 if event == "submitted":
00154 print "Unexpectedly updating existing JobAttemptSummary %s with line: %s" % (self.id,log_line)
00155 self.parent.IncrementNonJasErrorCount("Missing job start line")
00156 self.submit_time = submit_time
00157 return
00158
00159 if re.search("started\s+\S+",event):
00160 mo = re.search("started\s+(.*)",event)
00161 self.start_ut = TimeStampToUnixTime(mo.group(1))
00162 return
00163
00164 if re.search("^No output files",event) \
00165 or re.search("^Failed to retrieve output",event) \
00166 or re.search("^Failed to unpack",event) \
00167 :
00168 self.fail = self.FAIL_LOST
00169 return
00170
00171 if event == "killed by user": return
00172
00173
00174
00175 if re.search("^lost Ganga job with ID",event) \
00176 or re.search("^killed Ganga job",event):
00177 self.Terminate(self.STATE_FAILURE,self.FAIL_LOST)
00178
00179 if re.search("^returned\s+\S+",event):
00180 mo = re.search("CPU:\s+(\S+)\s+min",event)
00181 if mo: self.cpu = float(mo.group(1))
00182 mo = re.search("KSI2K=(\S+)",event)
00183 if mo:
00184 self.ksi2k_rating = float(mo.group(1))
00185
00186 mo = re.search("\S+\s+(\S+)\s+(\S+)",event)
00187 (status,com_level) = mo.groups()
00188
00189 if status == "SUCCEEDED":
00190 self.Terminate(self.STATE_SUCCESS)
00191 return
00192
00193
00194
00195 if status == "RETRY" and com_level == "USER" and not re.search("RESTART",event):
00196 self.Terminate(self.STATE_PARTIAL_SUCCESS)
00197 if re.search("NEW_SEED",event): self.request_new_seed = True
00198 return
00199
00200
00201
00202 if status == "HELD" or status == "RETRY" or status == "FAILED":
00203 if not self.fail:
00204 self.fail = self.FAIL_GRID
00205 if com_level == "USER" or com_level == "APPLICATION": self.fail = self.FAIL_USER
00206 self.Terminate(self.STATE_FAILURE,self.fail)
00207 return
00208
00209 print "Unhandled JAS update : %s" % event
00210
00211