00001 import re
00002 import os
00003 import shutil
00004 import time
00005
00006 import Ganga.GPI
00007
00008 from AsyncCallThread import timeout_call
00009
00010 from GBSIdCodes import *
00011 from GBSLogger import GetLoggerThreshold, Log, logger
00012 from GBSObject import GBSObject
00013 from GBSModelRegistry import GetModelRegistry
00014 from GBSTimeStamp import GBSTimeStamp as timestamp
00015 import GBSUtilities
00016
00017 class GBSJob(GBSObject) :
00018
00019 """Object to submit, and if necessary resubmit a job until it succeeds or needs user intervention.
00020
00021 This class is responsible for submitting and if necessary
00022 resubmitting a job until it is successful or user intervention is
00023 unavoidible.
00024
00025 """
00026
00027
00028
00029
00030 def __init__(self,name,parent,model,model_args):
00031 self.__tryNumber = 0
00032 self.__earlyFails = 0
00033 self.__lateFailsHandled = 0
00034 self.__lateFailsUnhandled = 0
00035 self.__scriptLocalArgs = ""
00036 self.__localEnvironment = ""
00037 self.__localInputSandbox = ""
00038 self.__localOutputSandbox = ""
00039
00040
00041 self.__statusCode = GID_JSC_NEW
00042
00043
00044
00045
00046
00047
00048
00049 self.__statusText = "Ready to run"
00050 self.__statusTime = timestamp()
00051
00052 self.__retryArgs = ""
00053 self.__gangaJobId = -1
00054
00055 self.__tryID = name + ".0"
00056
00057 GBSObject.__init__(self,name,parent,model)
00058
00059 def _DoMemberIO(self,ioh):
00060 self._SetTryNumber(ioh("Try Number","i",self.__tryNumber))
00061 self.__retryArgs = ioh("Current Retry Args","s",self.__retryArgs)
00062 self.__earlyFails = ioh("Early Fail Count","i",self.__earlyFails)
00063 self.__lateFailsHandled = ioh("Late Handled Fails Count","i",self.__lateFailsHandled)
00064 self.__lateFailsUnhandled = ioh("Late Unhandled Fails Count","i",self.__lateFailsUnhandled)
00065 self.__scriptLocalArgs = ioh("Script Local Args","s",self.__scriptLocalArgs)
00066 self.__localEnvironment = ioh("+Local environment","s",self.__localEnvironment)
00067 self.__localInputSandbox = ioh("+Local Input Sandbox List", "s",self.__localInputSandbox)
00068 self.__localOutputSandbox = ioh("+Local Output Sandbox List", "s",self.__localOutputSandbox)
00069 self.__statusCode = ioh("Status Code","i",self.__statusCode)
00070 self.__statusText = ioh("Status Text","s",self.__statusText)
00071 self.__statusTime = ioh("+Status Time Stamp","s",self.__statusTime)
00072 self.__gangaJobId = ioh("+Ganga Job Id","i",self.__gangaJobId)
00073
00074
00075 if self.__statusCode >= GID_JSC_SUBMITTED: self.__gangaJobId = self.__statusCode
00076
00077 GBSObject._DoMemberIO(self,ioh)
00078
00079 def GetType(self): return "GBSJob"
00080
00081 def __repr__(self): return self.AsString()
00082
00083
00084
00085
00086
00087
00088
00089
00090 def AsString(self,level = "Full"):
00091
00092 """Return string description.
00093
00094 Return string description at the following levels:-
00095 "Brief" one line summary suitable for rows in tables
00096 "Heading" one line summary suitable as heading for "Brief"
00097 "Full" full desciption including value of every data member"""
00098
00099 if ( level == "Heading"):
00100 s = "Name".ljust(20)
00101 s += "Status".ljust(23)
00102 s += "Input".ljust(40)
00103 s += "Status Details"
00104 return s
00105 if ( level == "Brief"):
00106 s = self.GetName().ljust(20)
00107 s += GIDStringForJSC(self.__statusCode).ljust(23)
00108 s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40)
00109 s += "[" + self.__statusText + "]"
00110 return s
00111
00112 s = GBSObject.__repr__(self) + "\n\n"
00113 s += "Status: " + GIDStringForJSC(self.__statusCode) + " [" + self.__statusText + "] at " + self.__statusTime + "\n"
00114 s += " Associated Ganga Job ID: " + str(self.__gangaJobId) + "\n\n"
00115 s += "Job Definition\n"
00116 s += " Script local args: '" + self.__scriptLocalArgs + "'\n"
00117 s += " Local environment: '" + self.__localEnvironment + "'\n"
00118 s += " Input Sandbox: '" + self.__localInputSandbox + "'\n"
00119 s += " Output Sandbox: '" + self.__localOutputSandbox + "'\n\n"
00120 s += "Retry Status\n"
00121 s += " Try: " + str(self.__tryNumber) + "\n"
00122 s += " Retry Args: '" + str(self.__retryArgs) + "'\n"
00123 s += " Early Fails: " + str(self.__earlyFails) + "\n"
00124 s += " Late Handled Fails: " + str(self.__lateFailsHandled) + "\n"
00125 s += " Late Unhandled Fails: " + str(self.__lateFailsUnhandled) + "\n"
00126
00127
00128
00129
00130 display_try = self.__tryNumber
00131 if self.__statusCode >= GID_JSC_SUBMITTED: display_try -= 1
00132 if display_try < 1: return s
00133 output_dir = self._GetTryOutputDir(display_try)
00134 s += "\nThe output for try %d can be found in\n\n %s\n\n and consists of:-\n\n" % (display_try,output_dir)
00135 list_dir = output_dir + '/../listing.tmp'
00136 os.system("cd " + output_dir + ";ls -l > " + list_dir)
00137 f = open(list_dir)
00138 for line in f: s += " " + line
00139 f.close()
00140 os.remove(list_dir)
00141 gbs_log_file_name = self._GetGbsLogFileName(display_try)
00142 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name)
00143 if not os.path.isfile(gbs_log_file_spec): return s
00144 s += "\nThe GLF (GBS Log File) %s contains:-\n\n" % gbs_log_file_name
00145 f = open(gbs_log_file_spec)
00146 for line in f: s += " " + line
00147 f.close()
00148 return s
00149
00150 def CanClear(self):
00151 """Return true if can ClearErrorCounts, ClearHistory and Remove"""
00152 return not self.IsSubmitted()
00153
00154 def CanKill(self):
00155 """Return true if can Kill"""
00156 return self.IsSubmitted()
00157
00158 def CanSubmit(self):
00159
00160 """Return true if can submit Ganga job"""
00161
00162 if self.__statusCode < GID_JSC_SUBMITTED \
00163 and self.__statusCode > GID_JSC_CANNOT_SUBMIT \
00164 and self.GetParent().GetScriptFileSpec(): return True
00165 return False
00166
00167 def GetEarlyFailsCount(self):
00168 """Return Early Fails Count"""
00169 return self.__earlyFails
00170
00171 def GetGangaJobId(self):
00172 """Return associated Ganga job Id (if any) or -1 """
00173 return self.__gangaJobId
00174
00175 def GetGangaJob(self):
00176 """Return associated Ganga job (if any)"""
00177 if self.__gangaJobId < 0: return None
00178 try:
00179 gj = Ganga.GPI.jobs(self.__gangaJobId)
00180
00181
00182 except: gj = None
00183 return gj
00184
00185 def GetLateHandledFailsCount(self):
00186 """Return Late Handled Fails Count"""
00187 return self.__lateFailsHandled
00188
00189 def GetLateUnhandledFailsCount(self):
00190 """Return Late Unhandled Fails Count"""
00191 return self.__lateFailsUnhandled
00192
00193 def GetLocalEnvironment(self,prettyPrint = False):
00194 """Return, as a comma separated list string, the environment that is local to this job.
00195
00196 If prettyPrint is True print out list one item per line"""
00197 if not prettyPrint: return self.__localEnvironment
00198 print "Local Environment:-"
00199 GBSUtilities.ListCommaSepList(self.__localEnvironment," ")
00200
00201 def GetLocalInputSandbox(self):
00202 """Return, as a comma separated list string, the input sandbox file list that is local to this job."""
00203 return self.__localInputSandbox
00204
00205 def GetLocalOutputSandbox(self):
00206 """Return, as a comma separated list string, the ouput sandbox file list that is local to this job."""
00207 return self.__localOutputSandbox
00208
00209 def GetPhaseCode(self):
00210 """Return phase code. These are broad categories of status code used by task for job statistics."""
00211 pc = 0
00212 if self.IsComplete():
00213 pc = GID_JPC_DONE_NFAIL
00214 if not self.IsSuccessful(): pc = GID_JPC_DONE_FAIL
00215 elif self.IsReady():
00216 pc = GID_JPC_READY_NRETRY
00217 if self.GetTryNumber() > 0: pc = GID_JPC_READY_RETRY
00218 elif self.IsSubmitted():
00219 pc = GID_JPC_SUBMIT_RUN
00220 if not self.IsRunning(): pc = GID_JPC_SUBMIT_NRUN
00221 else:
00222 pc = GID_JPC_NREADY_HOLD
00223 if not self.IsHeld(): pc = GID_JPC_NREADY_NHOLD
00224 return pc
00225
00226
00227 def GetRetryArgs(self):
00228 """Returm as a string current retry args i.e. as determined from previous try (or empty for first try)"""
00229 return self.__retryArgs
00230
00231 def GetScriptLocalArgs(self):
00232 """Return (as a string) the comma list of application script args that are local to this job"""
00233 return self.__scriptLocalArgs
00234
00235 def GetStatusCode(self):
00236 """Return status code"""
00237 return self.__statusCode
00238
00239 def GetStatusText(self):
00240 """Return status text which qualifies the Status Code"""
00241 return self.__statusText
00242
00243 def GetStatusTime(self):
00244 """Return date time when current status code and text were achieved"""
00245 return self.__statusTime
00246
00247 def GetTryID(self):
00248 """Return Try ID = Job Name.try number. Used to log job attempts."""
00249 return self.__tryID
00250
00251 def GetTryNumber(self):
00252 """Return Try Number (0 before first try)"""
00253 return self.__tryNumber
00254
00255 def IsComplete(self):
00256 """Return true if job is complete (Successful or Failed)"""
00257 return self.__statusCode <= GID_JSC_COMPLETE
00258
00259 def IsFailure(self):
00260 """Return true if job is failure"""
00261 return self.__statusCode == GID_JSC_FAILED
00262
00263 def IsHeld(self):
00264 """Return true if job is Held"""
00265 return self.__statusCode == GID_JSC_HELD
00266
00267 def IsNotReady(self):
00268 """Return true if job is not ready to submit"""
00269 return not self.IsReady()
00270
00271 def IsReady(self):
00272 """Return true if job is ready to submit"""
00273 return self.__statusCode > GID_JSC_CANNOT_SUBMIT and self.__statusCode < GID_JSC_SUBMITTED
00274
00275 def IsRunning(self):
00276 """Return true if job is submitted and associated Ganga Job status is running"""
00277 if self.__statusCode >= GID_JSC_SUBMITTED and re.search(r"Ganga status:running",self.__statusText): return True
00278 return False
00279
00280 def IsSubmitted(self):
00281 """Return true if job is submitted"""
00282 return self.__statusCode >= GID_JSC_SUBMITTED
00283
00284 def IsSuccessful(self):
00285 """Return true if job is successful"""
00286 return self.__statusCode == GID_JSC_SUCCEEDED
00287
00288
00289
00290
00291
00292
00293
00294
00295 def Analyse(self,update = True):
00296
00297 """Perform job termination analysis and optionally apply the results."""
00298
00299
00300 if self.__statusCode != GID_JSC_WAITING_ANALYSIS: return
00301
00302 analyser = GetModelRegistry().CreateObject(self.GetModel(),"JobAnalyser","Solomon",self)
00303 analyser.Analyse(self)
00304 if update:
00305 analyser.Apply()
00306 gj = self.GetGangaJob()
00307 if gj: gj.remove()
00308 self.__gangaJobId = -1
00309 self.GetParent().RefreshJobStats()
00310 self.Write()
00311
00312 def ClearErrorCounts(self,warn=True):
00313
00314 """If allowed, clear error counts, but leave retry history intact
00315
00316 Can only applied to jobs that are ready to be submitted or that have failed and
00317 in this case has the side effect of setting the status back to RETRY"""
00318
00319 if self.CanClear():
00320 self.__earlyFails = 0
00321 self.__lateFailsHandled = 0
00322 self.__lateFailsUnhandled = 0
00323 if self.__statusCode == GID_JSC_FAILED:
00324 self._SetStatus(GID_JSC_RETRY,"Retrying after user cleared errors")
00325 self.GetParent().RefreshJobStats()
00326 self.Write()
00327 return
00328 if warn: print "Cannot ClearErrorCounts on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00329
00330 def ClearHistory(self,confirm=True,warn=True):
00331
00332 """Completely clear all processing history so that processing begins again from scratch.
00333
00334 The only processing state that is retained is that if job was held it will still be."""
00335
00336 if not self.CanClear():
00337 if warn: print "Cannot ClearHistory on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00338 return
00339 if confirm:
00340 ans = raw_input("Are you sure you want to clear the history of job " + self.GetName() + " ?y[n]")
00341 if not re.search(r"^y",ans,re.I):
00342 print "History not cleared."
00343 return
00344 child_dir = self.GetStoreLocation("child_dir")
00345 if os.path.isdir(child_dir):
00346 if shutil.rmtree(child_dir):
00347 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00348 self._SetTryNumber(0)
00349 self.__retryArgs = ""
00350 self.__gangaJobId = -1
00351 if not self.IsHeld():
00352 self._SetStatus(GID_JSC_NEW,"Ready to run")
00353 self.ClearErrorCounts()
00354 return
00355
00356 def Hold(self,warn=True):
00357
00358 """Hold job, so that it won't be submitted. Warn, if requested, if job not suitable for holding"""
00359
00360 if self.IsReady():
00361 self._SetStatusCode(GID_JSC_HELD,"Held by user")
00362 self.GetParent().RefreshJobStats()
00363 self.Write()
00364 return
00365 if warn: print "Cannot HOLD job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00366
00367 def Kill(self,warn=True):
00368
00369 """Kill job that has been submitted to Ganga. Warn, if requested, if job not suitable for killing"""
00370
00371 if not self.CanKill():
00372 if warn: print "Cannot KILL job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00373 return
00374 try:
00375 gj = Ganga.GPI.jobs(self.__statusCode)
00376
00377
00378 except:
00379 Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode))
00380 return
00381
00382 gj.kill()
00383 Log(self,logger.ERROR,self.__tryID + " killed by user")
00384 self.UpdateStatus()
00385
00386 def Release(self,warn=True):
00387
00388 """Release job, so that it can be submitted. Warn, if requested, if job not suitable for releasing"""
00389
00390 if self.IsHeld():
00391 self.__statusCode = GID_JSC_NEW
00392 if self.__tryNumber: self.__statusCode = GID_JSC_RETRY
00393 self._SetStatusText("Ready to run")
00394 self.GetParent().RefreshJobStats()
00395 self.Write()
00396 return
00397 if warn: print "Cannot RELEASE job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00398
00399 def Remove(self,confirm=True,warn=True):
00400
00401 """Completely remove job."""
00402
00403 if not self.CanClear():
00404 if warn: print "Cannot Remove job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00405 return
00406 if confirm:
00407 ans = raw_input("Are you sure you want to remove job " + self.GetName() + " ?y[n]")
00408 if not re.search(r"^y",ans,re.I):
00409 print "Not removed."
00410 return
00411 child_dir = self.GetStoreLocation("child_dir")
00412 if os.path.isdir(child_dir):
00413 if shutil.rmtree(child_dir):
00414 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00415 state_file = self.GetStoreLocation("self")
00416 if os.remove(state_file):
00417 Log(self,logger.ERROR,"Failed to remove state file:" + state_file)
00418 self._SetTryNumber(0)
00419
00420 del self.GetParent()._jobManagers[self.GetName()]
00421 self.GetParent().RefreshJobStats()
00422 return
00423
00424 def SetLocalEnvironment(self,env_str):
00425
00426 """Set, as a comma separated list string, the environment that local to this job..
00427
00428 e.g. job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456'
00429
00430 If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it)"""
00431
00432 if re.search(r"^\+",env_str):
00433 s = self.__localEnvironment
00434 if s: s += ','
00435 s += env_str[1:]
00436
00437
00438 GBSJob.SetLocalEnvironment(self,s)
00439 return
00440 d = GBSUtilities.ParseEnvStr(env_str)
00441 if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
00442 else:
00443 self.__localEnvironment = GBSUtilities.BuildEnvStr(d)
00444 self.Write()
00445
00446 def SetLocalInputSandbox(self,in_sbox_str):
00447
00448 """Set, as a comma separated list string, the input sandbox file list that is local to this job.
00449
00450 e.g. task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
00451
00452 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
00453 if ok:
00454 self.__localInputSandbox = str
00455 self.Write()
00456
00457 def SetLocalOutputSandbox(self,out_sbox_str):
00458
00459 """Set, as a comma separated list string, the output sandbox file list that is local to this job.
00460
00461 e.g. task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')"""
00462
00463 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
00464 if ok:
00465 self.__localOutputSandbox = str
00466 self.Write()
00467
00468 def SetScriptLocalArgs(self,arg_str):
00469
00470 """Set (as a string) the comma separated list of application script args that are local to this job.
00471
00472 e.g. job.SetScriptLocalArgs('123,a string with spaces,456')"""
00473
00474 self.__scriptLocalArgs = arg_str
00475 self.Write()
00476
00477 def Submit(self,Perusable=False,\
00478 MonitorFrequency=0,\
00479 MonitorCommand="ps -o pid,ppid,rss,vsize,pcpu,pmem,cmd -u $USER"):
00480
00481 """Submit job if permitted and return True if successful."""
00482
00483 my_manager = self.GetParent()
00484
00485
00486 script_spec = my_manager.GetScriptFileSpec()
00487 if not script_spec:
00488 print "Cannot submit job, no user application script assigned to Task '" + my_manager.GetName() + "'"
00489 return False
00490 script_name = my_manager.GetScriptFileName()
00491
00492 if not self.CanSubmit():
00493 print "Cannot submit job " + self.GetName() + ":" \
00494 " Status: " + GIDStringForJSC(self.__statusCode) \
00495 + " [" + self.GetStatusText() + "]"
00496 return False
00497
00498 if not my_manager.IsAuthorisedToSubmit(): return False
00499
00500
00501
00502 self._SetTryNumber(self.__tryNumber + 1)
00503 self.MakeChildDirectory()
00504 output_dir = self._GetTryOutputDir()
00505
00506 if os.path.isdir(output_dir):
00507 Log(self,logger.SYNOPSIS,"Removing obsolete directory for job output:" + str(output_dir))
00508 if shutil.rmtree(output_dir):
00509 Log(self,logger.ERROR,"Failed to remove obsolete directory for job output:" + str(output_dir))
00510 Log(self,logger.SYNOPSIS,"Creating directory for job output:" + str(output_dir))
00511 if os.mkdir(output_dir,0755):
00512 Log(self,logger.ERROR,"Failed to create directory for job output:" + str(output_dir))
00513
00514
00515 gbs_log_file_name = self._GetGbsLogFileName()
00516 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name)
00517 os.system("echo " + timestamp() + " INFO GBS_JOB_SUBMIT submitting job > " + gbs_log_file_spec)
00518
00519
00520 args = []
00521 args.append(gbs_log_file_name)
00522 args.append(script_name)
00523
00524
00525 script_args = []
00526 for arglist in [my_manager.GetScriptGlobalArgs(),self.GetScriptLocalArgs()]:
00527 if arglist: GBSUtilities.ParseCommaSepList(arglist,script_args)
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540 wrapper_file_spec = str(output_dir) + "/" + "gbs_job_wrapper.sh"
00541
00542
00543 os.system("cp $GBS_HOME/python/gbs_job_wrapper_part_1.sh " + wrapper_file_spec)
00544
00545 wrapper = file(wrapper_file_spec,'a')
00546
00547
00548 if MonitorFrequency:
00549 wrapper.write('monitor_script=$GBS_WORK_DIR/gbs_monitor.sh\n')
00550 wrapper.write('if [ -f $monitor_script ] ; then rm -f $monitor_script; fi\n')
00551 wrapper.write('echo "call_num=0" >> $monitor_script\n')
00552 wrapper.write('echo "while [ 1 = 1 ]" >> $monitor_script\n')
00553 wrapper.write('echo "do" >> $monitor_script\n')
00554 wrapper.write('echo " echo \`date \'+%F %H:%M:%S \'\` Monitoring call number \$(( ++call_num ))" >> $monitor_script\n')
00555 wrapper.write('echo " %s" >> $monitor_script\n' %\
00556 MonitorCommand)
00557 wrapper.write('echo " sleep %d" >> $monitor_script\n' % \
00558 MonitorFrequency)
00559 wrapper.write('echo "done" >> $monitor_script\n')
00560 wrapper.write('chmod +x $monitor_script\n')
00561 wrapper.write('$GBS_LOG INFO Starting monitoring script at frequency %d secs to execute %s\n' %\
00562 (MonitorFrequency,MonitorCommand))
00563 wrapper.write('$monitor_script &\n')
00564
00565
00566 num_arg = 0
00567 arglist = ""
00568 for arg in script_args:
00569 num_arg += 1
00570 wrapper.write("arg" + str(num_arg) + "=\"" + arg + "\"\n")
00571 arglist += "\"$arg" + str(num_arg) +"\" "
00572 wrapper.write("/usr/bin/time --format 'CPU User: %U sec, System: %S sec. Elapse: %E. ' --output=$GBS_WORK_DIR/time_output_$$ "\
00573 + "./$user_script " + arglist + "\n")
00574 wrapper.close()
00575
00576
00577 os.system("cat $GBS_HOME/python/gbs_job_wrapper_part_2.sh >>" + wrapper_file_spec)
00578
00579
00580 env = {}
00581 env['GBS_MODE'] = my_manager.GetMode()
00582 env['GBS_RETRY_COUNT'] = str(self.__tryNumber -1)
00583 retry_arg_no = 0
00584 for retry_arg in self.__retryArgs.split():
00585 retry_arg_no += 1
00586 env['GBS_RETRY_ARG_' + str(retry_arg_no)] = retry_arg
00587 env['GBS_NUM_RETRY_ARGS'] = str(retry_arg_no)
00588
00589
00590 for env_str in [my_manager.GetGlobalEnvironment(),self.GetLocalEnvironment()]:
00591 if not env_str: continue
00592 for var,val in GBSUtilities.ParseEnvStr(env_str).iteritems():
00593 env[var] = val
00594
00595
00596 exe = Ganga.GPI.Executable(exe = Ganga.GPI.File(wrapper_file_spec), env = env,args = args)
00597
00598
00599 inputsandbox = []
00600 inputsandbox.append(script_spec)
00601 inputsandbox.append(gbs_log_file_spec)
00602 env_cmds = ""
00603 for e_name,e_value in env.iteritems():
00604 env_cmds += e_name + "=" + str(e_value) + ";"
00605 for obj in [my_manager,self]:
00606 sb_file_list = []
00607 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalInputSandbox(),sb_file_list)
00608 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalInputSandbox(),sb_file_list)
00609 input_dir = obj.GetStoreLocation("child_dir") + "/InputSandbox"
00610 for sb_file in sb_file_list: inputsandbox.append(input_dir + "/" + sb_file)
00611 outputsandbox = ['gbs_output_sandbox.tar.gz']
00612 outputsandbox.append(gbs_log_file_name)
00613 for obj in [my_manager,self]:
00614 sb_file_list = []
00615 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalOutputSandbox(),sb_file_list)
00616 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalOutputSandbox(),sb_file_list)
00617 for sb_file in sb_file_list:
00618 if sb_file[0] == "$":
00619 inp = os.popen(env_cmds + sb_file[1:])
00620 sb_file = inp.read()
00621 inp.close()
00622 outputsandbox.append(sb_file)
00623
00624
00625 backend = my_manager.GetBackend()
00626 queue = ""
00627 mo = re.search(r"(.*?):(.*)",backend)
00628 if mo:
00629 (backend,queue) = mo.groups()
00630 if backend == "Local": queue = ""
00631
00632
00633 gj = Ganga.GPI.Job(backend=backend)
00634 Ganga.GPI.jobtree.add(gj,my_manager.GetGangaTreeDir())
00635 if queue :
00636 if backend == "PBS": gj.backend.queue = queue
00637 if backend == "LCG": gj.backend.CE = queue
00638
00639 if backend == "LCG" and Ganga.GPI.config['LCG']['GLITE_ENABLE']:
00640 gj.backend.middleware = 'GLITE'
00641 if Perusable: gj.backend.perusable = True
00642 elif Perusable: print "Cannot select perusable; backend not LCG/GLITE"
00643 gj.application = exe
00644 gj.inputsandbox = inputsandbox
00645 gj.outputsandbox = outputsandbox
00646 if GetLoggerThreshold() <= logger.SYNOPSIS: print "Contents of Ganga job:-\n\n" + str(gj)
00647 try:
00648 print "Submitting ",self.GetName()
00649 timeout_call(self,gj.submit,3*60)
00650 Log(self,logger.INFO,self.__tryID + " submitted")
00651 except Exception,inst:
00652 excp_msg = "Caught exception:" + str(inst) +" Failed to complete Ganga job submit"
00653 Log(self,logger.ERROR,excp_msg)
00654 self._SetStatusText(excp_msg)
00655 self._SetTryNumber(self.__tryNumber - 1)
00656 return False
00657 self.__gangaJobId = gj.id
00658 self._SetStatus(gj.id,"Ganga status:" + gj.status)
00659 self.GetParent().RefreshJobStats()
00660 self.Write()
00661 return True
00662
00663
00664 def UpdateStatus(self):
00665
00666 """Get latest status. May involve checking Ganga job and retrieving output."""
00667
00668 if not self.IsSubmitted(): return
00669 try:
00670 gj = Ganga.GPI.jobs(self.__statusCode)
00671
00672
00673 except:
00674 gj = None
00675
00676
00677 if not gj:
00678 Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode))
00679
00680 g_status = 'new'
00681 else:
00682 g_status = gj.status
00683
00684
00685 if g_status == "completing" \
00686 or g_status == "ready" \
00687 or g_status == "running" \
00688 or g_status == "scheduled" \
00689 or g_status == "submitted" \
00690 or g_status == "waiting":
00691 self._SetStatusText("Ganga status:" + g_status)
00692
00693
00694 t_stall_hours = (time.time() \
00695 - time.mktime(time.strptime(self.__statusTime.strip(),"%Y-%m-%d %H:%M:%S")))/3600.
00696
00697 t_timeout_hours = 3.
00698 if g_status == "scheduled" or g_status == "running": t_timeout_hours = 3.*24.
00699 if t_stall_hours < t_timeout_hours:
00700 self.GetParent().RefreshJobStats()
00701 self.Write()
00702 return
00703
00704 self._SetStatusText("killed Ganga job %d stalled for %5.1f hours " % (gj.id,t_stall_hours))
00705 Log(self,logger.ERROR,self.__tryID + " " + self.__statusText)
00706 gj.remove()
00707 g_status = 'new'
00708
00709
00710 if g_status == 'new':
00711 self._SetTryNumber(self.__tryNumber - 1)
00712 self.__statusCode = GID_JSC_NEW
00713 self.__gangaJobId = -1
00714 if self.__tryNumber > 1: self.__statusCode = GID_JSC_RETRY
00715 self._SetStatusText("Ready to run")
00716 self.GetParent().RefreshJobStats()
00717 self.Write()
00718 return
00719
00720
00721
00722 output_dir = self._GetTryOutputDir()
00723 Log(self,logger.SYNOPSIS,"Retrieving output from " + str(gj.outputdir) + " into " + str(output_dir))
00724 if not os.listdir(gj.outputdir):
00725 Log(self,logger.ERROR,self.__tryID + " No output files in " + str(gj.outputdir))
00726 elif os.system("cp " + str(gj.outputdir) + "* " + str(output_dir)):
00727 Log(self,logger.ERROR,self.__tryID + " Failed to retrieve output from " + str(gj.outputdir) + " into " + str(output_dir))
00728 try:
00729 if gj.backend.perusable:
00730 print "Job is perusable, attempting to recover stdout, as stdout.perusable"
00731 cmd = "glite-wms-job-perusal --get -f stdout --all --noint " \
00732 + gj.backend.id + " > " + output_dir + "/stdout.perusable"
00733 if os.system(cmd):
00734 Log(self,logger.ERROR,self.__tryID + " Failed to retrieve persuable output using ",cmd)
00735 except: pass
00736 gbs_output_sandbox = output_dir + "/gbs_output_sandbox.tar.gz"
00737 if os.path.isfile(gbs_output_sandbox):
00738 Log(self,logger.SYNOPSIS,"Unpacking " + gbs_output_sandbox)
00739 if os.system("cd " + output_dir + "; tar zxf gbs_output_sandbox.tar.gz"):
00740 Log(self,logger.ERROR,self.__tryID + " Failed to unpack " + gbs_output_sandbox)
00741 else:
00742 os.remove(gbs_output_sandbox)
00743 gangaStatusFile = output_dir + "/gbs_ganga.status"
00744 f = open(gangaStatusFile,"w")
00745 f.write(str(gj))
00746 f.close()
00747 self.__statusCode = GID_JSC_WAITING_ANALYSIS
00748 try: g_status += " [" + gj.backend.reason + "]"
00749 except: pass
00750 self._SetStatusText("Waiting to analyse. Ganga exit status was " + g_status)
00751
00752
00753 self.Analyse()
00754
00755
00756 def WaitForJob(self,num_tries=100,time_interval=30):
00757
00758 """Wait for running job to end before returning.
00759
00760 Limit to 'num_tries' with a sleep of 'time_interval' between."""
00761 if not self.IsSubmitted():
00762 print "Cannot wait for job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00763 return
00764 try_num = 0
00765 current_state = ""
00766 print "Waiting for job %s to end" % self.GetName()
00767 while ( try_num < num_tries ):
00768 try_num += 1
00769 self.UpdateStatus()
00770 new_state = "%s [%s]" % (GIDStringForJSC(self.GetStatusCode()),self.GetStatusText())
00771 if current_state != new_state:
00772 print timestamp() + " Job %s status now %s" % (self.GetName(),new_state)
00773 current_state = new_state
00774 if not self.IsSubmitted(): return
00775 time.sleep(time_interval)
00776 print "Giving up waiting for job to end"
00777
00778
00779
00780
00781 def _GetTryOutputDir(self, try_req = 0):
00782
00783 """Return the file spec for the current try (or supplied try) output directory"""
00784
00785 try_use = self.__tryNumber
00786 if try_req > 0: try_use = try_req
00787 return self.GetStoreLocation("child_dir") + "/try_" + str(try_use).zfill(3)
00788
00789
00790 def _GetGbsLogFileName(self, try_req = 0):
00791
00792 """Return the name of the GBS Log File for the current try (or supplied try)."""
00793 try_use = self.__tryNumber
00794 if try_req > 0: try_use = try_req
00795 return "gbs_" + self.GetParent().GetName() + "_" + self.GetName() + "_" + str(try_use) + ".log"
00796
00797 def _SetTryNumber(self, try_no):
00798
00799 """Update try number and associated tryID."""
00800 self.__tryNumber = try_no
00801 self.__tryID = "JTID:%s.%d" % (self.GetName(),self.__tryNumber)
00802
00803
00804 def _IncrementEarlyFailsCount(self): self.__earlyFails += 1
00805 def _IncrementLateHandledFailsCount(self): self.__lateFailsHandled += 1
00806 def _IncrementLateUnhandledFailsCount(self): self.__lateFailsUnhandled += 1
00807 def _SetRetryArgs(self,value): self.__retryArgs = value
00808
00809
00810
00811 def _SetStatus(self,code,text):
00812 if self.__statusCode == code and self.__statusText == text: return
00813 self.__statusCode = code
00814 self.__statusText = text
00815 self.__statusTime = timestamp()
00816 gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName()
00817 if os.path.isfile(gbs_log_file_spec):
00818 os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \
00819 + " [" + self.__statusText + "]' >> " + gbs_log_file_spec)
00820
00821 def _SetStatusCode(self,code): self._SetStatus(code,self.__statusText)
00822 def _SetStatusText(self,text): self._SetStatus(self.__statusCode,text)