GBSJob.py

Go to the documentation of this file.
00001 import re
00002 import os
00003 import shutil
00004 import time
00005 
00006 import Ganga.GPI
00007 
00008 from  AsyncCallThread import timeout_call
00009 
00010 from   GBSIdCodes import *
00011 from   GBSLogger  import GetLoggerThreshold, Log, logger 
00012 from   GBSObject  import GBSObject
00013 from   GBSModelRegistry import GetModelRegistry
00014 from   GBSTimeStamp import GBSTimeStamp as timestamp
00015 import GBSUtilities
00016 
00017 class GBSJob(GBSObject) :
00018 
00019     """Object to submit, and if necessary resubmit a job until it succeeds or needs user intervention.
00020 
00021     This class is responsible for submitting and if necessary
00022     resubmitting a job until it is successful or user intervention is
00023     unavoidible.
00024       
00025     """
00026 
00027     ######  GBSObject inherited responsibilities   ###### 
00028       
00029 
00030     def __init__(self,name,parent,model,model_args):
00031         self.__tryNumber          = 0  # 0 before first try
00032         self.__earlyFails         = 0  # Symptomatic of a system failure
00033         self.__lateFailsHandled   = 0  # Job signalling retry
00034         self.__lateFailsUnhandled = 0  # Job aborting without signal
00035         self.__scriptLocalArgs    = "" # Application script local args
00036         self.__localEnvironment   = "" # The environment that are local to this job.
00037         self.__localInputSandbox  = "" # The list of input sandbox files that are local to this job.
00038         self.__localOutputSandbox = "" # The list of output sandbox files that are local to this job.
00039         
00040         # statusCode:  See GBSIdCodes
00041         self.__statusCode         = GID_JSC_NEW
00042         # Associated text to qualify statusCode. Has specific meaning when
00043         # __statusCode has the following values which correspond to the values
00044         # passed back from the application script via the GBS Log File:-
00045         #
00046         #  GID_JSC_SUCCEEDED  The list of output files
00047         #  GID_JSC_FAILED     Diagnostic message about the failure
00048         #  GID_JSC_RETRY      The retry args for next attempt.
00049         self.__statusText         = "Ready to run"
00050         self.__statusTime         = timestamp()
00051         # The current retry args i.e. as determined from previous try (or empty for first try)
00052         self.__retryArgs   = ""
00053         self.__gangaJobId  = -1
00054         # The remaining members are volatile i.e. not persisted but derived from other state.
00055         self.__tryID       = name + ".0"
00056         
00057         GBSObject.__init__(self,name,parent,model)
00058 
00059     def _DoMemberIO(self,ioh):
00060         self._SetTryNumber(ioh("Try Number","i",self.__tryNumber))
00061         self.__retryArgs           = ioh("Current Retry Args","s",self.__retryArgs)
00062         self.__earlyFails          = ioh("Early Fail Count","i",self.__earlyFails)
00063         self.__lateFailsHandled    = ioh("Late Handled Fails Count","i",self.__lateFailsHandled)
00064         self.__lateFailsUnhandled  = ioh("Late Unhandled Fails Count","i",self.__lateFailsUnhandled)
00065         self.__scriptLocalArgs     = ioh("Script Local Args","s",self.__scriptLocalArgs)
00066         self.__localEnvironment    = ioh("+Local environment","s",self.__localEnvironment)
00067         self.__localInputSandbox   = ioh("+Local Input Sandbox List",    "s",self.__localInputSandbox)
00068         self.__localOutputSandbox  = ioh("+Local Output Sandbox List",   "s",self.__localOutputSandbox)
00069         self.__statusCode          = ioh("Status Code","i",self.__statusCode)
00070         self.__statusText          = ioh("Status Text","s",self.__statusText)
00071         self.__statusTime          = ioh("+Status Time Stamp","s",self.__statusTime)
00072         self.__gangaJobId          = ioh("+Ganga Job Id","i",self.__gangaJobId)
00073 
00074         # During transition may need to infer __gangaJobId from __statusCode
00075         if self.__statusCode >= GID_JSC_SUBMITTED: self.__gangaJobId = self.__statusCode
00076 
00077         GBSObject._DoMemberIO(self,ioh)
00078 
00079     def GetType(self): return "GBSJob"
00080 
00081     def __repr__(self): return self.AsString()
00082 
00083 
00084 #-----------------------------------------------------------------------------------------------
00085 #
00086 #  User Callable Methods Setters
00087 #
00088 #-----------------------------------------------------------------------------------------------
00089 
00090     def AsString(self,level = "Full"):
00091 
00092         """Return string description.
00093 
00094          Return string description at the following levels:-
00095         "Brief"    one line summary suitable for rows in tables
00096         "Heading"  one line summary suitable as heading for "Brief"
00097         "Full"     full desciption including value of every data member"""
00098 
00099         if ( level == "Heading"):
00100             s  = "Name".ljust(20)
00101             s += "Status".ljust(23)
00102             s += "Input".ljust(40)
00103             s += "Status Details"
00104             return s
00105         if ( level == "Brief"):
00106             s  = self.GetName().ljust(20)
00107             s += GIDStringForJSC(self.__statusCode).ljust(23)
00108             s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40)
00109             s += "[" + self.__statusText + "]"
00110             return s
00111 
00112         s  = GBSObject.__repr__(self) + "\n\n"
00113         s += "Status: " + GIDStringForJSC(self.__statusCode) +  " [" + self.__statusText + "] at " + self.__statusTime + "\n"
00114         s += "  Associated Ganga Job ID: " + str(self.__gangaJobId)  + "\n\n"
00115         s += "Job Definition\n"
00116         s += "  Script local args: '" + self.__scriptLocalArgs + "'\n"
00117         s += "  Local environment: '" + self.__localEnvironment + "'\n"
00118         s += "  Input Sandbox:     '"  + self.__localInputSandbox + "'\n" 
00119         s += "  Output Sandbox:    '"  + self.__localOutputSandbox + "'\n\n" 
00120         s += "Retry Status\n"
00121         s += "  Try:                  "  + str(self.__tryNumber) + "\n"
00122         s += "  Retry Args:           '" + str(self.__retryArgs) + "'\n"
00123         s += "  Early Fails:          "  + str(self.__earlyFails) + "\n"
00124         s += "  Late Handled Fails:   "  + str(self.__lateFailsHandled) + "\n"
00125         s += "  Late Unhandled Fails: "  + str(self.__lateFailsUnhandled) + "\n"
00126 
00127         # If there exist any output, give a summary, either for the current try, if
00128         # complete, or the previous one if currently submitted.
00129 
00130         display_try = self.__tryNumber
00131         if self.__statusCode >= GID_JSC_SUBMITTED: display_try -= 1
00132         if display_try < 1: return s
00133         output_dir = self._GetTryOutputDir(display_try)
00134         s += "\nThe output for try %d can be found in\n\n   %s\n\n and consists of:-\n\n" % (display_try,output_dir)
00135         list_dir = output_dir + '/../listing.tmp'
00136         os.system("cd " + output_dir + ";ls -l > " + list_dir)
00137         f = open(list_dir)
00138         for line in f: s += "  " + line
00139         f.close()
00140         os.remove(list_dir)
00141         gbs_log_file_name = self._GetGbsLogFileName(display_try)
00142         gbs_log_file_spec =  str(output_dir) + "/" + str(gbs_log_file_name)
00143         if not os.path.isfile(gbs_log_file_spec): return s
00144         s += "\nThe GLF (GBS Log File) %s contains:-\n\n" % gbs_log_file_name
00145         f = open(gbs_log_file_spec)
00146         for line in f: s += "  " + line
00147         f.close()
00148         return s
00149 
00150     def CanClear(self):
00151         """Return true if can ClearErrorCounts, ClearHistory and Remove"""
00152         return not self.IsSubmitted()
00153 
00154     def CanKill(self):
00155         """Return true if can Kill"""
00156         return self.IsSubmitted()
00157    
00158     def CanSubmit(self):
00159 
00160         """Return true if can submit Ganga job"""
00161 
00162         if      self.__statusCode < GID_JSC_SUBMITTED \
00163             and self.__statusCode > GID_JSC_CANNOT_SUBMIT \
00164             and self.GetParent().GetScriptFileSpec(): return True
00165         return False
00166 
00167     def GetEarlyFailsCount(self):
00168         """Return Early Fails Count"""
00169         return self.__earlyFails
00170 
00171     def GetGangaJobId(self):
00172         """Return associated Ganga job Id (if any) or -1 """
00173         return self.__gangaJobId
00174 
00175     def GetGangaJob(self):
00176         """Return associated Ganga job (if any)"""
00177         if self.__gangaJobId < 0: return None
00178         try:
00179             gj = Ganga.GPI.jobs(self.__gangaJobId)
00180         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00181         # catch everything and hope it is O.K.!
00182         except: gj = None
00183         return gj
00184         
00185     def GetLateHandledFailsCount(self):
00186         """Return Late Handled Fails Count"""
00187         return self.__lateFailsHandled
00188     
00189     def GetLateUnhandledFailsCount(self):
00190         """Return Late Unhandled Fails Count"""
00191         return self.__lateFailsUnhandled
00192     
00193     def GetLocalEnvironment(self,prettyPrint = False):
00194         """Return, as a comma separated list string, the environment that is local to this job.
00195 
00196         If prettyPrint is True print out list one item per line"""
00197         if not prettyPrint: return self.__localEnvironment
00198         print "Local Environment:-"
00199         GBSUtilities.ListCommaSepList(self.__localEnvironment,"  ")
00200    
00201     def GetLocalInputSandbox(self):
00202         """Return, as a comma separated list string, the input sandbox file list that is local to this job."""
00203         return self.__localInputSandbox
00204     
00205     def GetLocalOutputSandbox(self):
00206         """Return, as a comma separated list string, the ouput sandbox file list that is local to this job."""
00207         return self.__localOutputSandbox
00208 
00209     def GetPhaseCode(self):
00210         """Return phase code. These are broad categories of status code used by task for job statistics."""
00211         pc = 0
00212         if self.IsComplete():
00213             pc = GID_JPC_DONE_NFAIL
00214             if not self.IsSuccessful(): pc = GID_JPC_DONE_FAIL
00215         elif self.IsReady():
00216             pc = GID_JPC_READY_NRETRY 
00217             if self.GetTryNumber() > 0: pc = GID_JPC_READY_RETRY
00218         elif self.IsSubmitted():
00219             pc = GID_JPC_SUBMIT_RUN
00220             if not self.IsRunning(): pc = GID_JPC_SUBMIT_NRUN
00221         else:
00222             pc = GID_JPC_NREADY_HOLD
00223             if not self.IsHeld(): pc = GID_JPC_NREADY_NHOLD
00224         return pc
00225 
00226         
00227     def GetRetryArgs(self):
00228         """Returm as a string current retry args i.e. as determined from previous try (or empty for first try)"""
00229         return self.__retryArgs
00230     
00231     def GetScriptLocalArgs(self):
00232         """Return (as a string) the comma list of application script args that are local to this job"""
00233         return self.__scriptLocalArgs
00234     
00235     def GetStatusCode(self):
00236         """Return status code"""
00237         return self.__statusCode
00238     
00239     def GetStatusText(self):
00240         """Return status text which qualifies the Status Code"""
00241         return self.__statusText
00242     
00243     def GetStatusTime(self):
00244         """Return date time when current status code and text were achieved"""
00245         return self.__statusTime
00246     
00247     def GetTryID(self):
00248         """Return Try ID = Job Name.try number.  Used to log job attempts."""
00249         return self.__tryID
00250     
00251     def GetTryNumber(self):
00252         """Return Try Number (0 before first try)"""
00253         return self.__tryNumber
00254     
00255     def IsComplete(self):
00256         """Return true if job is complete (Successful or Failed)"""
00257         return self.__statusCode <= GID_JSC_COMPLETE
00258     
00259     def IsFailure(self):
00260         """Return true if job is failure"""
00261         return self.__statusCode == GID_JSC_FAILED
00262 
00263     def IsHeld(self):
00264         """Return true if job is Held"""
00265         return self.__statusCode == GID_JSC_HELD
00266      
00267     def IsNotReady(self):
00268         """Return true if job is not ready to submit"""
00269         return not self.IsReady() 
00270      
00271     def IsReady(self):
00272         """Return true if job is ready to submit"""
00273         return self.__statusCode > GID_JSC_CANNOT_SUBMIT and self.__statusCode < GID_JSC_SUBMITTED
00274     
00275     def IsRunning(self):
00276         """Return true if job is submitted and associated Ganga Job status is running"""
00277         if self.__statusCode >= GID_JSC_SUBMITTED and re.search(r"Ganga status:running",self.__statusText): return True
00278         return False
00279   
00280     def IsSubmitted(self):
00281         """Return true if job is submitted"""
00282         return self.__statusCode >= GID_JSC_SUBMITTED
00283    
00284     def IsSuccessful(self):
00285         """Return true if job is successful"""
00286         return self.__statusCode == GID_JSC_SUCCEEDED
00287 
00288 #-----------------------------------------------------------------------------------------------
00289 #
00290 #  User Callable Methods Setters
00291 #
00292 #-----------------------------------------------------------------------------------------------
00293 
00294 
00295     def Analyse(self,update = True):
00296 
00297         """Perform job termination analysis and optionally apply the results."""
00298 
00299         # Quit if nothing to analyse.
00300         if self.__statusCode != GID_JSC_WAITING_ANALYSIS: return
00301         
00302         analyser = GetModelRegistry().CreateObject(self.GetModel(),"JobAnalyser","Solomon",self)
00303         analyser.Analyse(self)
00304         if update:
00305             analyser.Apply()
00306             gj = self.GetGangaJob()
00307             if gj: gj.remove()
00308             self.__gangaJobId  = -1
00309             self.GetParent().RefreshJobStats()
00310             self.Write()
00311 
00312     def ClearErrorCounts(self,warn=True):
00313 
00314         """If allowed, clear error counts, but leave retry history intact 
00315 
00316         Can only applied to jobs that are ready to be submitted or that have failed and
00317         in this case has the side effect of setting the status back to RETRY"""
00318 
00319         if self.CanClear():
00320             self.__earlyFails         = 0
00321             self.__lateFailsHandled   = 0
00322             self.__lateFailsUnhandled = 0
00323             if self.__statusCode == GID_JSC_FAILED:
00324                 self._SetStatus(GID_JSC_RETRY,"Retrying after user cleared errors")
00325             self.GetParent().RefreshJobStats()
00326             self.Write()
00327             return
00328         if warn: print "Cannot ClearErrorCounts on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00329         
00330     def ClearHistory(self,confirm=True,warn=True):
00331 
00332         """Completely clear all processing history so that processing begins again from scratch.
00333 
00334         The only processing state that is retained is that if job was held it will still be."""
00335         
00336         if not self.CanClear():
00337             if warn: print "Cannot ClearHistory on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00338             return
00339         if confirm:
00340             ans = raw_input("Are you sure you want to clear the history of job " + self.GetName() + "  ?y[n]")
00341             if not re.search(r"^y",ans,re.I):
00342                 print "History not cleared."
00343                 return
00344         child_dir = self.GetStoreLocation("child_dir")
00345         if os.path.isdir(child_dir):
00346             if shutil.rmtree(child_dir):
00347                 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00348         self._SetTryNumber(0)
00349         self.__retryArgs     = ""
00350         self.__gangaJobId    = -1
00351         if not self.IsHeld():
00352             self._SetStatus(GID_JSC_NEW,"Ready to run")
00353         self.ClearErrorCounts()
00354         return
00355         
00356     def Hold(self,warn=True):
00357 
00358         """Hold job, so that it won't be submitted.  Warn, if requested, if job not suitable for holding"""
00359 
00360         if self.IsReady():
00361              self._SetStatusCode(GID_JSC_HELD,"Held by user")
00362              self.GetParent().RefreshJobStats()
00363              self.Write()
00364              return
00365         if warn: print "Cannot HOLD job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00366 
00367     def Kill(self,warn=True):
00368 
00369        """Kill job that has been submitted to Ganga.  Warn, if requested, if job not suitable for killing"""
00370 
00371        if not self.CanKill():
00372            if warn: print "Cannot KILL job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00373            return
00374        try:
00375             gj = Ganga.GPI.jobs(self.__statusCode)
00376         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00377         # catch everything and hope it is O.K.!
00378        except:
00379            Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode))
00380            return         
00381        # Kill job
00382        gj.kill()
00383        Log(self,logger.ERROR,self.__tryID + " killed by user")
00384        self.UpdateStatus()
00385 
00386     def Release(self,warn=True):
00387 
00388         """Release job, so that it can be submitted.  Warn, if requested, if job not suitable for releasing"""
00389 
00390         if self.IsHeld():
00391              self.__statusCode = GID_JSC_NEW
00392              if  self.__tryNumber: self.__statusCode = GID_JSC_RETRY
00393              self._SetStatusText("Ready to run")
00394              self.GetParent().RefreshJobStats()
00395              self.Write()
00396              return
00397         if warn: print "Cannot RELEASE job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00398 
00399     def Remove(self,confirm=True,warn=True):
00400 
00401         """Completely remove job."""
00402         
00403         if not self.CanClear():
00404             if warn: print "Cannot Remove job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00405             return
00406         if confirm:
00407             ans = raw_input("Are you sure you want to remove job " + self.GetName() + "  ?y[n]")
00408             if not re.search(r"^y",ans,re.I):
00409                 print "Not removed."
00410                 return
00411         child_dir = self.GetStoreLocation("child_dir")
00412         if os.path.isdir(child_dir):
00413             if shutil.rmtree(child_dir):
00414                 Log(self,logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00415         state_file = self.GetStoreLocation("self")
00416         if os.remove(state_file):
00417             Log(self,logger.ERROR,"Failed to remove state file:" + state_file)
00418         self._SetTryNumber(0)
00419         # Sneak into parent task's job list and remove entry.
00420         del self.GetParent()._jobManagers[self.GetName()]
00421         self.GetParent().RefreshJobStats()
00422         return
00423         
00424     def SetLocalEnvironment(self,env_str):
00425 
00426         """Set, as a comma separated list string, the environment that local to this job..
00427 
00428       e.g.  job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456'
00429 
00430       If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it)"""
00431 
00432         if re.search(r"^\+",env_str):
00433             s = self.__localEnvironment
00434             if s: s += ','
00435             s += env_str[1:]
00436             # Force local method to be recalled rather than any inherited one
00437             # MinosRSMJob internally uses append and if that got recalled get a recursive loop.
00438             GBSJob.SetLocalEnvironment(self,s)
00439             return
00440         d =  GBSUtilities.ParseEnvStr(env_str)
00441         if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
00442         else:
00443             self.__localEnvironment = GBSUtilities.BuildEnvStr(d)
00444             self.Write()
00445 
00446     def SetLocalInputSandbox(self,in_sbox_str):
00447 
00448         """Set, as a comma separated list string, the input sandbox file list that is local to this job.
00449 
00450       e.g.  task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
00451 
00452         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
00453         if ok:
00454             self.__localInputSandbox = str
00455             self.Write()
00456 
00457     def SetLocalOutputSandbox(self,out_sbox_str):
00458 
00459         """Set, as a comma separated list string, the output sandbox file list that is local to this job.
00460 
00461       e.g.  task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')"""
00462 
00463         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
00464         if ok:
00465             self.__localOutputSandbox = str
00466             self.Write()
00467 
00468     def SetScriptLocalArgs(self,arg_str):
00469 
00470         """Set (as a string) the comma separated list of application script args that are local to this job.
00471 
00472       e.g.  job.SetScriptLocalArgs('123,a string with spaces,456')"""
00473 
00474         self.__scriptLocalArgs = arg_str
00475         self.Write()
00476 
00477     def Submit(self,Perusable=False,\
00478                MonitorFrequency=0,\
00479                MonitorCommand="ps -o pid,ppid,rss,vsize,pcpu,pmem,cmd -u $USER"):
00480 
00481         """Submit job if permitted and return True if successful."""
00482 
00483         my_manager = self.GetParent()
00484 
00485         #  Get the user application script
00486         script_spec = my_manager.GetScriptFileSpec()
00487         if not script_spec:
00488             print "Cannot submit job, no user application script assigned to Task '" + my_manager.GetName() + "'"
00489             return False
00490         script_name = my_manager.GetScriptFileName()
00491 
00492         if not self.CanSubmit():
00493             print "Cannot submit job " + self.GetName() + ":" \
00494                   " Status: " + GIDStringForJSC(self.__statusCode) \
00495                   + " [" + self.GetStatusText() + "]"
00496             return False
00497 
00498         if not my_manager.IsAuthorisedToSubmit(): return False
00499 
00500         # Increment try number and prepare output directory to receive results
00501         
00502         self._SetTryNumber(self.__tryNumber + 1)
00503         self.MakeChildDirectory()
00504         output_dir = self._GetTryOutputDir()
00505         # It ought not to exist already, but in case it is, remove it first.
00506         if os.path.isdir(output_dir):
00507             Log(self,logger.SYNOPSIS,"Removing obsolete directory for job output:" + str(output_dir))
00508             if shutil.rmtree(output_dir):
00509                 Log(self,logger.ERROR,"Failed to remove obsolete directory for job output:" + str(output_dir))
00510         Log(self,logger.SYNOPSIS,"Creating directory for job output:" + str(output_dir))
00511         if os.mkdir(output_dir,0755):
00512             Log(self,logger.ERROR,"Failed to create directory for job output:" + str(output_dir))
00513 
00514         # Prepare GBS Log File in output directory
00515         gbs_log_file_name = self._GetGbsLogFileName()
00516         gbs_log_file_spec =  str(output_dir) + "/" + str(gbs_log_file_name)
00517         os.system("echo " + timestamp() + " INFO GBS_JOB_SUBMIT submitting job > " + gbs_log_file_spec)
00518 
00519         # Prepare arg list: GLF file, appl. script, global args, local args.
00520         args = []
00521         args.append(gbs_log_file_name)
00522         args.append(script_name)
00523 
00524         # Collect up the application script args, taking account of ecaped commas. 
00525         script_args = []
00526         for arglist in [my_manager.GetScriptGlobalArgs(),self.GetScriptLocalArgs()]:
00527             if arglist: GBSUtilities.ParseCommaSepList(arglist,script_args)
00528 
00529         # Passing args with anything but alphanumeric data into the
00530         # application script is a pain.  The logical approach, placing
00531         # each arg as a separate element in 'args' works for the local
00532         # back-end but is broken for LCG where all the args are first
00533         # interpolated and then re-parsed to define the argument list
00534         # that gets passed into the script.  Any string containing
00535         # white space gets broken apart and anything containing "(" causes
00536         # a syntax error! So, instead we manufacture the wrapper
00537         # script on the fly that will call the application script 
00538         # which gives us total control over the way arguments get passed in.
00539 
00540         wrapper_file_spec =  str(output_dir) + "/" + "gbs_job_wrapper.sh"
00541 
00542         #Copy the leading part of the wrapper
00543         os.system("cp $GBS_HOME/python/gbs_job_wrapper_part_1.sh " + wrapper_file_spec)
00544 
00545         wrapper  = file(wrapper_file_spec,'a')
00546 
00547         #If monitoring is required, set up and start running the monitor
00548         if MonitorFrequency:
00549             wrapper.write('monitor_script=$GBS_WORK_DIR/gbs_monitor.sh\n')
00550             wrapper.write('if [ -f $monitor_script ] ; then rm -f $monitor_script; fi\n')
00551             wrapper.write('echo "call_num=0"                                                         >> $monitor_script\n')
00552             wrapper.write('echo "while [ 1 = 1 ]"                                                    >> $monitor_script\n')
00553             wrapper.write('echo "do"                                                                 >> $monitor_script\n')
00554             wrapper.write('echo "  echo \`date \'+%F %H:%M:%S \'\` Monitoring call number \$(( ++call_num ))" >> $monitor_script\n')
00555             wrapper.write('echo "  %s"                                                               >> $monitor_script\n' %\
00556                           MonitorCommand)
00557             wrapper.write('echo "  sleep %d"                                                         >> $monitor_script\n' % \
00558                           MonitorFrequency)
00559             wrapper.write('echo "done"                                                               >> $monitor_script\n')
00560             wrapper.write('chmod +x $monitor_script\n')
00561             wrapper.write('$GBS_LOG INFO Starting monitoring script at frequency %d secs to execute %s\n' %\
00562                           (MonitorFrequency,MonitorCommand))
00563             wrapper.write('$monitor_script &\n')
00564 
00565         # Manufacture the bit where the arguments get passed in
00566         num_arg  = 0
00567         arglist  = ""
00568         for arg in script_args:
00569             num_arg += 1
00570             wrapper.write("arg" + str(num_arg) + "=\"" + arg + "\"\n")
00571             arglist += "\"$arg" + str(num_arg) +"\" "
00572         wrapper.write("/usr/bin/time --format 'CPU User: %U sec, System: %S sec.  Elapse: %E. ' --output=$GBS_WORK_DIR/time_output_$$ "\
00573                       + "./$user_script " + arglist + "\n")
00574         wrapper.close()
00575 
00576         #Copy the trailing part of the wrapper
00577         os.system("cat $GBS_HOME/python/gbs_job_wrapper_part_2.sh >>" + wrapper_file_spec)
00578 
00579         # Prepare the GBS environment
00580         env = {}
00581         env['GBS_MODE'] = my_manager.GetMode()
00582         env['GBS_RETRY_COUNT'] = str(self.__tryNumber -1)
00583         retry_arg_no = 0
00584         for retry_arg in self.__retryArgs.split():
00585             retry_arg_no += 1
00586             env['GBS_RETRY_ARG_' + str(retry_arg_no)] = retry_arg
00587         env['GBS_NUM_RETRY_ARGS'] = str(retry_arg_no)
00588 
00589         # Prepare the user environment
00590         for env_str in [my_manager.GetGlobalEnvironment(),self.GetLocalEnvironment()]:
00591             if not env_str: continue
00592             for var,val in GBSUtilities.ParseEnvStr(env_str).iteritems():
00593                 env[var] = val
00594         
00595         # Prepare the executable
00596         exe = Ganga.GPI.Executable(exe = Ganga.GPI.File(wrapper_file_spec), env = env,args = args)
00597 
00598         # Prepare sandboxes
00599         inputsandbox = []
00600         inputsandbox.append(script_spec)
00601         inputsandbox.append(gbs_log_file_spec)
00602         env_cmds = ""
00603         for e_name,e_value in env.iteritems():
00604            env_cmds += e_name + "=" + str(e_value) + ";" 
00605         for obj in [my_manager,self]:
00606             sb_file_list = []
00607             if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalInputSandbox(),sb_file_list)
00608             else:           GBSUtilities.ParseCommaSepList(my_manager.GetGlobalInputSandbox(),sb_file_list)
00609             input_dir = obj.GetStoreLocation("child_dir") + "/InputSandbox"
00610             for sb_file in sb_file_list: inputsandbox.append(input_dir + "/" + sb_file)
00611         outputsandbox = ['gbs_output_sandbox.tar.gz']
00612         outputsandbox.append(gbs_log_file_name)
00613         for obj in [my_manager,self]:
00614             sb_file_list = []
00615             if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalOutputSandbox(),sb_file_list)
00616             else:           GBSUtilities.ParseCommaSepList(my_manager.GetGlobalOutputSandbox(),sb_file_list)
00617             for sb_file in sb_file_list:
00618                 if sb_file[0] == "$":
00619                     inp = os.popen(env_cmds + sb_file[1:])
00620                     sb_file =  inp.read()
00621                     inp.close()
00622                 outputsandbox.append(sb_file)
00623 
00624         # Prepare backend
00625         backend = my_manager.GetBackend()
00626         queue   = ""
00627         mo = re.search(r"(.*?):(.*)",backend)
00628         if mo:
00629             (backend,queue) = mo.groups()
00630             if backend == "Local": queue   = ""
00631         
00632         # Prepare the job
00633         gj               = Ganga.GPI.Job(backend=backend)
00634         Ganga.GPI.jobtree.add(gj,my_manager.GetGangaTreeDir())
00635         if queue :
00636             if backend == "PBS": gj.backend.queue = queue
00637             if backend == "LCG": gj.backend.CE    = queue
00638         # If GLITE_ENABLE is enabled select the correct middleware
00639         if backend == "LCG" and Ganga.GPI.config['LCG']['GLITE_ENABLE']:
00640             gj.backend.middleware = 'GLITE'
00641             if Perusable: gj.backend.perusable = True
00642         elif Perusable: print "Cannot select perusable; backend not LCG/GLITE"
00643         gj.application   = exe
00644         gj.inputsandbox  = inputsandbox
00645         gj.outputsandbox = outputsandbox
00646         if GetLoggerThreshold() <= logger.SYNOPSIS: print "Contents of Ganga job:-\n\n" + str(gj)
00647         try:
00648             print "Submitting ",self.GetName()
00649             timeout_call(self,gj.submit,3*60)
00650             Log(self,logger.INFO,self.__tryID + " submitted")
00651         except Exception,inst:
00652              excp_msg = "Caught exception:" + str(inst) +" Failed to complete Ganga job submit"
00653              Log(self,logger.ERROR,excp_msg)
00654              self._SetStatusText(excp_msg)
00655              self._SetTryNumber(self.__tryNumber - 1)
00656              return False
00657         self.__gangaJobId = gj.id
00658         self._SetStatus(gj.id,"Ganga status:" + gj.status)
00659         self.GetParent().RefreshJobStats()
00660         self.Write()
00661         return True
00662    
00663 
00664     def UpdateStatus(self):
00665         
00666         """Get latest status.  May involve checking Ganga job and retrieving output."""
00667 
00668         if not self.IsSubmitted(): return
00669         try:
00670             gj = Ganga.GPI.jobs(self.__statusCode)
00671         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00672         # catch everything and hope it is O.K.!
00673         except:
00674             gj = None
00675 
00676         # Deal with lost Ganga job, ought not to happen, but then lots of things in life ought not to happen.
00677         if not gj:
00678             Log(self,logger.ERROR,self.__tryID + " lost Ganga job with ID " + str(self.__statusCode))
00679             # Treat as if status has reverted to "new"
00680             g_status = 'new'
00681         else:
00682             g_status = gj.status
00683 
00684         # Deal with cases when job hasn't finished
00685         if     g_status == "completing" \
00686            or  g_status == "ready"      \
00687            or  g_status == "running"    \
00688            or  g_status == "scheduled"  \
00689            or  g_status == "submitted"  \
00690            or  g_status == "waiting":
00691             self._SetStatusText("Ganga status:" + g_status)
00692 
00693             # If Ganga job does not appear stalled, just record any change in status.
00694             t_stall_hours = (time.time() \
00695                              - time.mktime(time.strptime(self.__statusTime.strip(),"%Y-%m-%d %H:%M:%S")))/3600.
00696             # Set a 3 hour time limit except for scheduled and running where 3 days is allowed.
00697             t_timeout_hours = 3.
00698             if g_status == "scheduled" or  g_status == "running": t_timeout_hours = 3.*24.
00699             if t_stall_hours < t_timeout_hours:
00700                 self.GetParent().RefreshJobStats()
00701                 self.Write()
00702                 return
00703             # Ganga job has stalled; kill it and treat as new.
00704             self._SetStatusText("killed Ganga job %d stalled for %5.1f hours " % (gj.id,t_stall_hours))
00705             Log(self,logger.ERROR,self.__tryID + " " + self.__statusText)
00706             gj.remove()
00707             g_status = 'new'
00708 
00709         # If status has reverted to "new" submit never took place
00710         if g_status == 'new':
00711             self._SetTryNumber(self.__tryNumber - 1)
00712             self.__statusCode = GID_JSC_NEW
00713             self.__gangaJobId  = -1
00714             if self.__tryNumber > 1: self.__statusCode = GID_JSC_RETRY
00715             self._SetStatusText("Ready to run")
00716             self.GetParent().RefreshJobStats()
00717             self.Write()
00718             return
00719 
00720 
00721         # For all other cases move over output data, unpack gbs_output_sandbox.tar.gz (if present) and save Ganga status
00722         output_dir = self._GetTryOutputDir()
00723         Log(self,logger.SYNOPSIS,"Retrieving output from " + str(gj.outputdir) + " into " + str(output_dir))
00724         if not os.listdir(gj.outputdir):
00725            Log(self,logger.ERROR,self.__tryID + " No output files in " + str(gj.outputdir))
00726         elif os.system("cp " + str(gj.outputdir) + "* " + str(output_dir)):
00727             Log(self,logger.ERROR,self.__tryID + " Failed to retrieve output from " + str(gj.outputdir) + " into " + str(output_dir))
00728         try:
00729             if gj.backend.perusable:
00730                 print "Job is perusable, attempting to recover stdout, as stdout.perusable"
00731                 cmd = "glite-wms-job-perusal --get -f stdout --all --noint " \
00732                       + gj.backend.id + " > " + output_dir + "/stdout.perusable"
00733                 if os.system(cmd):
00734                     Log(self,logger.ERROR,self.__tryID + " Failed to retrieve persuable output using ",cmd)
00735         except: pass
00736         gbs_output_sandbox = output_dir + "/gbs_output_sandbox.tar.gz"
00737         if os.path.isfile(gbs_output_sandbox):
00738             Log(self,logger.SYNOPSIS,"Unpacking " + gbs_output_sandbox)
00739             if os.system("cd " + output_dir + "; tar zxf gbs_output_sandbox.tar.gz"):
00740                 Log(self,logger.ERROR,self.__tryID + " Failed to unpack " + gbs_output_sandbox)
00741             else:
00742                 os.remove(gbs_output_sandbox)
00743         gangaStatusFile = output_dir + "/gbs_ganga.status"
00744         f = open(gangaStatusFile,"w")
00745         f.write(str(gj))
00746         f.close()
00747         self.__statusCode = GID_JSC_WAITING_ANALYSIS
00748         try:    g_status += " [" + gj.backend.reason + "]"
00749         except: pass
00750         self._SetStatusText("Waiting to analyse.  Ganga exit status was " + g_status)
00751 
00752         # Now analyse the results and decide what happens next
00753         self.Analyse()
00754         # No need to write self, the Analyse() call will have done that.
00755 
00756     def WaitForJob(self,num_tries=100,time_interval=30):
00757 
00758         """Wait for running job to end before returning.
00759 
00760         Limit to 'num_tries' with a sleep of 'time_interval' between."""
00761         if not self.IsSubmitted():
00762             print "Cannot wait for job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00763             return
00764         try_num = 0
00765         current_state = ""
00766         print "Waiting for job %s to end" % self.GetName()
00767         while ( try_num < num_tries ):
00768             try_num += 1
00769             self.UpdateStatus()
00770             new_state = "%s [%s]" % (GIDStringForJSC(self.GetStatusCode()),self.GetStatusText())
00771             if current_state != new_state:
00772                 print timestamp() + " Job %s status now %s" % (self.GetName(),new_state)
00773                 current_state = new_state
00774                 if not self.IsSubmitted(): return
00775             time.sleep(time_interval)
00776         print "Giving up waiting for job to end"
00777 
00778         
00779     ######  Private Methods (not user callable)  ######
00780 
00781     def _GetTryOutputDir(self, try_req = 0):
00782 
00783         """Return the file spec for the current try (or supplied try) output directory"""
00784 
00785         try_use = self.__tryNumber
00786         if try_req > 0: try_use = try_req
00787         return self.GetStoreLocation("child_dir") + "/try_" + str(try_use).zfill(3)
00788 
00789 
00790     def _GetGbsLogFileName(self, try_req = 0):
00791         
00792         """Return the name of the GBS Log File for the current try (or supplied try)."""
00793         try_use = self.__tryNumber
00794         if try_req > 0: try_use = try_req
00795         return "gbs_" + self.GetParent().GetName() + "_" + self.GetName() +  "_" + str(try_use) + ".log"
00796 
00797     def _SetTryNumber(self, try_no):
00798 
00799         """Update try number and associated tryID."""
00800         self.__tryNumber = try_no
00801         self.__tryID     = "JTID:%s.%d" % (self.GetName(),self.__tryNumber)
00802 
00803     #  Methods used by JobAnalyser to update 
00804     def _IncrementEarlyFailsCount(self):         self.__earlyFails += 1
00805     def _IncrementLateHandledFailsCount(self):   self.__lateFailsHandled += 1
00806     def _IncrementLateUnhandledFailsCount(self): self.__lateFailsUnhandled += 1
00807     def _SetRetryArgs(self,value):               self.__retryArgs = value
00808 
00809     # Methods that record state changes.  These also update state time stamp and add
00810     # entry to GLF if it exists.
00811     def _SetStatus(self,code,text):
00812         if self.__statusCode == code and self.__statusText == text: return
00813         self.__statusCode = code
00814         self.__statusText = text
00815         self.__statusTime = timestamp()
00816         gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName()
00817         if os.path.isfile(gbs_log_file_spec):
00818             os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \
00819                       +  " [" + self.__statusText + "]' >> " + gbs_log_file_spec)
00820 
00821     def _SetStatusCode(self,code): self._SetStatus(code,self.__statusText)
00822     def _SetStatusText(self,text): self._SetStatus(self.__statusCode,text) 

Generated on Fri Mar 5 09:25:41 2010 for gbs by  doxygen 1.4.7