GBSTask.py

Go to the documentation of this file.
00001 import glob
00002 import os
00003 import re
00004 
00005 import Ganga.GPI
00006 
00007 from   GBSIdCodes       import *
00008 from   GBSConfig        import GetConfigValue
00009 from   GBSLogger        import Log, logger 
00010 from   GBSObject        import GBSObject
00011 from   GBSModelRegistry import GetModelRegistry
00012 from   GBSTimeStamp     import GBSTimeStamp
00013 import GBSUtilities
00014 
00015 class GBSTask(GBSObject) :
00016     """Top level object to manage a single task.
00017 
00018     This object will manages an set of Jobs, each of which looks after
00019     a single job, submitting and resubmitting until successful or user
00020     intervention is required.
00021       
00022     ProtoJobs
00023     =========
00024 
00025     Batches of new jobs are introduced into the system initially as "ProtoJobs".
00026     The following methods are used to manipulate them:-
00027 
00028     AddProtoJob(job_name,args_str="",env_str="")
00029     PromoteProtoJobs(job_name_pattern = ".*")
00030     RemoveProtoJobs(job_name_pattern = ".*")
00031        
00032     """
00033 
00034     ######  GBSObject inherited responsibilities   ###### 
00035       
00036 
00037     def __init__(self,name,parent,model,model_args):
00038         self.__scriptFileName      = ""
00039         self.__scriptGlobalArgs    = ""
00040         self.__globalEnvironment   = ""
00041         self.__globalInputSandbox  = ""
00042         self.__globalOutputSandbox = ""
00043         self.__backend             = GetConfigValue("DefaultBackend")
00044         self.__submitEnabled       = 0
00045         self.__maxGangaJobs        = int(GetConfigValue("DefaultMaxGangaJobs"))
00046         self.__maxSubmitJobs       = int(GetConfigValue("DefaultMaxSubmitJobs"))
00047         self.__mode                = "Test"
00048 
00049         # The remaining members are volatile i.e. not persisted but derived from other state.
00050         # Note that _jobManagers is not private so that MinosRSMTask can rename jobs.
00051         self._jobManagers       = {}   # Volatile: Hash name -> Job
00052         self.__gangaTreeDir     = 'gbs/' + name # Ganga JobTree directory that holds jobs.
00053 
00054         self.__jstatAll         = 0 # Job stats: All
00055         self.__jstatNReadyAll   = 0 # Job stats: Not ready - all
00056         self.__jstatNReadyOther = 0 # Job stats: Not ready - other than placed on hold
00057         self.__jstatReadyAll    = 0 # Job stats: Ready to run - all
00058         self.__jstatReadyRetry  = 0 # Job stats: Ready to run - retries 
00059         self.__jstatSubmitAll   = 0 # Job stats: Submitted to Ganga - all
00060         self.__jstatSubmitNRun  = 0 # Job stats: Submitted to Ganga - but not running
00061         self.__jstatDoneAll     = 0 # Job stats: Done - all
00062         self.__jstatDoneFail    = 0 # Job stats: Done - Failed
00063 
00064         GBSObject.__init__(self,name,parent,model)
00065 
00066         # Create Ganga JobTree directory if necessary
00067         if not Ganga.GPI.jobtree.exists(self.__gangaTreeDir): Ganga.GPI.jobtree.mkdir(self.__gangaTreeDir)
00068 
00069         self.MakeChildDirectory()
00070         self.__ReloadChildren()
00071         self.RefreshJobStats()
00072 
00073 
00074     def _DoMemberIO(self,ioh):
00075         self.__scriptFileName       = ioh("Script file name",              "s",self.__scriptFileName)
00076         self.__scriptGlobalArgs     = ioh("Script Global Args",            "s",self.__scriptGlobalArgs)
00077         self.__globalEnvironment    = ioh("+Global environment",           "s",self.__globalEnvironment)
00078         self.__globalInputSandbox   = ioh("+Global Input Sandbox List",    "s",self.__globalInputSandbox)
00079         self.__globalOutputSandbox  = ioh("+Global Output Sandbox List",   "s",self.__globalOutputSandbox)
00080         self.__backend              = ioh("Backend",                       "s",self.__backend) 
00081         self.__submitEnabled        = ioh("Submit Enabled",                "i",self.__submitEnabled) 
00082         self.__maxGangaJobs         = ioh("Max Ganga Jobs",                "i",self.__maxGangaJobs) 
00083         self.__maxSubmitJobs        = ioh("Max Submit Jobs",               "i",self.__maxSubmitJobs) 
00084         self.__mode                 = ioh("+Test/Production Mode",         "s",self.__mode) 
00085         GBSObject._DoMemberIO(self,ioh)
00086 
00087     def WriteFamily(self):
00088         self.Write()
00089         for job_name,job in self._jobManagers.iteritems():job.WriteFamily()
00090 
00091     def GetType(self): return "GBSTask"
00092 
00093     def __repr__(self) : return self.AsString()
00094     
00095 
00096 #-----------------------------------------------------------------------------------------------
00097 #
00098 #  User Callable Methods Getters
00099 #
00100 #-----------------------------------------------------------------------------------------------
00101 
00102 
00103     def AsString(self,level = "Full"):
00104 
00105         """Return string description.
00106 
00107          Return string description at the following levels:-
00108         "Brief"    one line summary suitable for rows in tables
00109         "Heading"  one line summary suitable as heading for "Brief"
00110         "Full"     full desciption including value of every data member"""
00111 
00112         if ( level == "Heading"):
00113             s  = "Name".ljust(20)
00114             s += "Model".ljust(15)
00115             s += "!Ready(other)  Ready(retry)    Sub(!run)   Done(fail)".rjust(51)
00116             s += "Backend".rjust(10) + "  "
00117             s += "Script file".ljust(40)
00118             return s
00119         if ( level == "Brief"):
00120             s  = self.GetName().ljust(20)
00121             s += self.GetModel().ljust(15)
00122             s += str(self.__jstatNReadyAll).rjust(8)   + "("+ str(self.__jstatNReadyOther).rjust(3) + ")" 
00123             s += str(self.__jstatReadyAll).rjust(9)   + "("+ str(self.__jstatReadyRetry).rjust(3) + ")" 
00124             s += str(self.__jstatSubmitAll).rjust(8) + "("+ str(self.__jstatSubmitNRun).rjust(3) + ")" 
00125             s += str(self.__jstatDoneAll).rjust(8)   + "("+ str(self.__jstatDoneFail).rjust(3) + ")" 
00126             backend = self.__backend
00127             mo = re.search(r"(.*?):",backend)
00128             if mo: backend = mo.group(1)
00129             s += backend.rjust(10) + "  "
00130             s += self.__scriptFileName.ljust(40)
00131             return s
00132 
00133         s  = GBSObject.__repr__(self) + "\n\n"
00134         s += "Model: " + self.GetModel() + "\n\n"
00135         s += "Job Definition\n"
00136         s += "  Script file:     "  + self.__scriptFileName  + "\n"
00137         s += "  Global args:     '" + self.__scriptGlobalArgs + "'\n"
00138         s += "  Global env:      '" + self.__globalEnvironment + "'\n" 
00139         s += "  Input Sandbox:   '"  + self.__globalInputSandbox + "'\n" 
00140         s += "  Output Sandbox:  '"  + self.__globalOutputSandbox + "'\n\n" 
00141         s += "Job Submission\n"
00142         s += "  "
00143         if self.__submitEnabled: s += "Enabled\n"
00144         else:                    s += "Disabled\n"
00145         s += "  Backend: " + self.__backend + "\n"
00146         s += "  Limits:  " + str(self.__maxSubmitJobs) + "(single submit) " + str(self.__maxGangaJobs) + "(maximum)\n"
00147         s += "  Mode:    " + self.__mode  + "\n\n" 
00148         s += "Managing "  + str(self.__jstatAll) + " jobs\n"
00149         s += "  Holding:   " + str(self.__jstatNReadyAll).rjust(5)   + " ("+ str(self.__jstatNReadyOther) + " other)\n" 
00150         s += "  Waiting:   " + str(self.__jstatReadyAll).rjust(5)   + " ("+ str(self.__jstatReadyRetry) + " retries)\n" 
00151         s += "  Submitted: " + str(self.__jstatSubmitAll).rjust(5) + " ("+ str(self.__jstatSubmitNRun) + " not running)\n" 
00152         s += "  Done:      " + str(self.__jstatDoneAll).rjust(5)   + " ("+ str(self.__jstatDoneFail) + " failed)\n" 
00153         return s
00154  
00155 
00156     def GetBackend(self):
00157         """Return backend for future jobs"""
00158         return self.__backend
00159     
00160     def GetGangaTreeDir(self):
00161         """Return Ganga JobTree Directory that holds all Ganga jobs"""
00162         return self.__gangaTreeDir
00163 
00164     def GetGlobalEnvironment(self,prettyPrint = False):
00165         """Return, as a comma separated list string, the environment that is global to all jobs
00166 
00167         If prettyPrint is True print out list one item per line"""
00168         if not prettyPrint: return self.__globalEnvironment
00169         print "Global Environment:-"
00170         GBSUtilities.ListCommaSepList(self.__globalEnvironment,"  ")
00171     
00172     def GetGlobalInputSandbox(self):
00173         """Return, as a comma separated list string, the input sandbox file list that is global to all jobs"""
00174         return self.__globalInputSandbox
00175     
00176     def GetGlobalOutputSandbox(self):
00177         """Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs"""
00178         return self.__globalOutputSandbox
00179     
00180     def GetMaxGangaJobs(self):
00181         """Return the maximum number of jobs that can be submitted to Ganga at any one time """
00182         return self.__maxGangaJobs
00183     
00184     def GetMaxSubmitJobs(self):
00185         """Return the maximum number of jobs that can be submitted by a single call SubmitJobs() """
00186         return self.__maxSubmitJobs
00187     
00188     def GetMode(self):
00189         """Return the Test/Production Mode """
00190         return self.__mode
00191     
00192     def GetScriptFileName(self):
00193         """Returns the current user application script file name (or "" if none). """
00194         return self.__scriptFileName
00195 
00196     def GetScriptFileSpec(self):
00197         if not self.__scriptFileName : return ""
00198         return self.GetStoreLocation("child_dir") + "/" + self.__scriptFileName
00199 
00200     def GetJob(self,name,warn=True) :
00201         """Return Job called job_name e.g. job = task.GetJob("job_00123")"""
00202         if not self._jobManagers.has_key(name):
00203             if warn: print "Sorry, there is no Job named " + str(name)
00204             return None
00205         else : return self._jobManagers[name]
00206 
00207     def GetJobs(self,job_name_pattern = ".*"):
00208         """Return a dictionary: dict[name]  job for all jobs matching pattern"""
00209 
00210         job_dict = {}
00211         for job_name,job in self._jobManagers.iteritems():
00212             if re.search(job_name_pattern,job_name): job_dict[job_name] = job
00213         return job_dict
00214 
00215     def GetScriptGlobalArgs(self):
00216         """Returns (as a string) the comma separated list of application script args that are global to all jobs """
00217         return self.__scriptGlobalArgs
00218 
00219     def GetTestOnlyMethods(self):
00220         """Return a list of methods that are only available in 'Test' mode."""
00221         return ["SetGlobalEnvironment",   \
00222                 "SetGlobalInputSandbox",  \
00223                 "SetGlobalOutputSandbox", \
00224                 "SetScriptFileName",      \
00225                 "SetScriptGlobalArgs"]    \
00226 
00227     def IsAuthorisedToSubmit(self,warn=True):
00228         """Returns True is authorised to submit jobs.
00229 
00230         Always True except if backend is LCG, then checks proxy"""
00231         if self.GetBackend()[0:3] != "LCG" : return True
00232         ok = False
00233         try:    ok = Ganga.GPI.gridProxy.isValid()
00234         except: ok = False
00235         if warn and not ok: print "Cannot submit jobs; backend is LCG but cannot find valid proxy"
00236         return ok        
00237 
00238     def IsDisabled(self,method):
00239         """Return True if method is disabled."""
00240 
00241         if self.GetMode() == 'Test': return False
00242         for disabled_method in self.GetTestOnlyMethods():
00243             if method == disabled_method:
00244                 print "Method " + method + " is disabled in Production mode"
00245                 return True
00246         return False
00247         
00248     def ListJobs(self,job_name_pattern = ".*"):
00249 
00250         """List existing Jobs matching pattern"""
00251         
00252         print self.GetName() + " has the following jobs",
00253         if job_name_pattern != ".*": print "that match the pattern: '" + str(job_name_pattern) + "'",
00254         print ":-"
00255         first = 1
00256         job_names = self._jobManagers.keys()
00257         job_names.sort()
00258         for job_name in job_names:
00259             if not re.search(job_name_pattern,job_name): continue
00260             job = self._jobManagers[job_name]
00261             if first:
00262                 print job.AsString("Heading") + "\n"
00263                 first = 0
00264             print job.AsString("Brief")
00265         if first: print " no jobs found"
00266 
00267 
00268 #-----------------------------------------------------------------------------------------------
00269 #
00270 #  User Callable Methods Setters
00271 #
00272 #-----------------------------------------------------------------------------------------------
00273 
00274 
00275     def AddJob(self,job_name,args_str = "",env_str = ""):
00276 
00277         """Create a new named Job and optionally assign its application script local args and environment. 
00278 
00279       e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no")
00280 
00281       Note that the job_name must begin job_ (if omitted it will be added automatically) and
00282       will be rejected if it contains anything beyond alphanumeric, '_' and '-'.for as it
00283       will be used to name a directory."""
00284 
00285         return self._AddJobOrProtoJob(job_name,args_str,env_str,"Job")
00286 
00287     def ClearErrorCountsJobs(self,job_name_pattern= ".*"):
00288         """After seeking confirmation, clear error counts on all suitable jobs matching pattern"""
00289         self.__ApplyActionToJobs("CLEAR ERROR COUNTS",job_name_pattern)
00290 
00291     def ClearHistoryJobs(self,job_name_pattern= ".*"):
00292         """After seeking confirmation, clear histories on all suitable jobs matching pattern"""
00293         self.__ApplyActionToJobs("CLEAR HISTORY",job_name_pattern)
00294 
00295     def EnableSubmit(self,enable=True):
00296         """Enable or disable submission of jobs via the SubmitJobs() method.
00297       The function returns the previous value of the switch."""
00298         
00299         old_val = self.__submitEnabled
00300         self.__submitEnabled  = 0
00301         if enable: self.__submitEnabled  = 1
00302         self.Write()
00303         return old_val
00304 
00305     def HoldJobs(self,job_name_pattern= ".*"):
00306         """After seeking confirmation, hold all suitable jobs matching pattern"""
00307         self.__ApplyActionToJobs("HOLD",job_name_pattern)
00308 
00309     def KillJobs(self,job_name_pattern= ".*"):
00310         """After seeking confirmation, kill all suitable jobs matching pattern"""
00311         self.__ApplyActionToJobs("KILL",job_name_pattern)
00312 
00313     def RefreshJobStats(self):
00314 
00315         """ Bring job statistics up to date.
00316 
00317         Should not be necessary (but harmless) for user to call this
00318         as child jobs call it when their state changes.  Note: unlike
00319         UpdateJobsStatus this simply asks jobs what their current
00320         state without checking Ganga """
00321         self.__jstatAll         = 0
00322         self.__jstatNReadyAll     = 0
00323         self.__jstatNReadyOther   = 0
00324         self.__jstatReadyAll     = 0
00325         self.__jstatReadyRetry   = 0
00326         self.__jstatSubmitAll   = 0
00327         self.__jstatSubmitNRun  = 0
00328         self.__jstatDoneAll     = 0
00329         self.__jstatDoneFail    = 0
00330         for job_name,job in self._jobManagers.iteritems():
00331             self.__jstatAll += 1
00332             pc = job.GetPhaseCode()
00333             if pc == GID_JPC_DONE_FAIL   or pc == GID_JPC_DONE_NFAIL:   self.__jstatDoneAll += 1
00334             if pc == GID_JPC_DONE_FAIL:                                 self.__jstatDoneFail += 1
00335             if pc == GID_JPC_READY_RETRY or pc == GID_JPC_READY_NRETRY: self.__jstatReadyAll += 1
00336             if pc == GID_JPC_READY_RETRY:                               self.__jstatReadyRetry += 1
00337             if pc == GID_JPC_SUBMIT_RUN  or pc == GID_JPC_SUBMIT_NRUN:  self.__jstatSubmitAll += 1
00338             if pc == GID_JPC_SUBMIT_NRUN:                               self.__jstatSubmitNRun += 1
00339             if pc == GID_JPC_NREADY_HOLD or pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyAll += 1
00340             if pc == GID_JPC_NREADY_NHOLD:                              self.__jstatNReadyOther += 1
00341 
00342     def ReleaseJobs(self,job_name_pattern = ".*"):
00343         """After seeking confirmation, release all suitable jobs matching pattern"""
00344         self.__ApplyActionToJobs("RELEASE",job_name_pattern)
00345 
00346     def RemoveJobs(self,job_name_pattern= ".*"):
00347         """After seeking confirmation, remove all suitable jobs matching pattern"""
00348         self.__ApplyActionToJobs("REMOVE",job_name_pattern)
00349         
00350     def SetBackend(self,backend):
00351 
00352         """Define backend for future jobs:  One of: Local, PBS:queue, LCG:queue"""
00353 
00354         #  Check value before accepting
00355         if not re.search(r"^(Local|PBS|LCG)(:.+)?$",backend):
00356             print "Sorry, '" + str(backend) + "' is not an acceptible backend. It should be one of:-\n"\
00357                   + "  Local, PBS{:queue} or LCG{:queue}"
00358             return
00359         self.__backend = backend
00360         self.Write()
00361 
00362     def SetGlobalEnvironment(self,env_str):
00363 
00364         """Set, as a comma separated list string, the environment that is global to all jobs.
00365 
00366       e.g.  task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456')
00367 
00368       If the first character is + e.g. +var3=567,var4=b, append to existing environment rather than replace it"""
00369 
00370         if self.IsDisabled("SetGlobalEnvironment"): return
00371         if re.search(r"^\+",env_str):
00372             s = self.__globalEnvironment
00373             if s: s += ','
00374             s += env_str[1:]
00375             # Force local method to be recalled rather than any inherited one
00376             # May just be paranoia but see corresponding GBSJob method.
00377             GBSTask.SetGlobalEnvironment(self,s)
00378             return
00379         d =  GBSUtilities.ParseEnvStr(env_str)
00380         if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
00381         else:
00382             self.__globalEnvironment = GBSUtilities.BuildEnvStr(d)
00383             self.Write()
00384 
00385     def SetGlobalInputSandbox(self,in_sbox_str):
00386 
00387         """Set, as a comma separated list string, the input sandbox file list that is global to all jobs.
00388 
00389       e.g.  task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
00390 
00391         if self.IsDisabled("SetGlobalInputSandbox"): return
00392         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
00393         if ok:
00394             self.__globalInputSandbox = str
00395             self.Write()
00396 
00397     def SetGlobalOutputSandbox(self,out_sbox_str):
00398 
00399         """Set, as a comma separated list string, the output sandbox file list that is global to all jobs.
00400 
00401       e.g.  task.SetGlobalOutputSandbox('my_output_data.dat,my_output.log')"""
00402 
00403         if self.IsDisabled("SetGlobalOutputSandbox"): return
00404         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
00405         if ok:
00406             self.__globalOutputSandbox = str
00407             self.Write()
00408 
00409     def SetMaxGangaJobs(self,n):
00410         """Set the maximum number of jobs that can be submitted to Ganga at any one time"""
00411         
00412         self.__maxGangaJobs = n
00413         self.Write()
00414 
00415     def SetMaxSubmitJobs(self,n):
00416         """Set the maximum number of jobs that can be submitted by a single call SubmitJobs()"""
00417         
00418         self.__maxSubmitJobs = n
00419         self.Write()
00420     
00421     def SetMode(self,mode):
00422         """Set the Test/Production Mode """
00423         if mode != "Test" and mode != "Production":
00424             print "Illegal mode: '" + mode + "', only 'Test' and 'Production' permitted"
00425             return
00426         if self.__mode == mode: return
00427         if mode == 'Production':
00428             print "Caution:  Switching to 'Production' mode will disable the following methods:-\n"
00429             print str(self.GetTestOnlyMethods())
00430             print "\n but should be done before moving into full production."
00431         else:
00432             if not self.GetScriptFileName():
00433                 print "You cannot switch to 'Production' mode; no global script file has been defined."
00434                 return
00435             print "Caution: Switching back to 'Test' mode is potentially dangerous as it re-enables the following methods:-\n"
00436             print str(self.GetTestOnlyMethods())
00437             print "\n and using these methods could produce results that are not homogeneous"
00438         ans = raw_input("Do you want to proceed ?y[n]")
00439         if not re.search(r"^y",ans,re.I):
00440             print "Mode not changed."
00441             return
00442         print "Mode changed to " + mode
00443         self.__mode = mode
00444         self.Write()
00445 
00446     def SetScriptFileName(self,ext_file_spec):
00447 
00448         """Pass in script file to be used by jobs.  GBS make a local copy
00449         so no need to keep once passed.
00450 
00451         e.g.  task.SetScriptFileName("my/directory/my_script_file.sh")"""
00452 
00453         if self.IsDisabled("SetScriptFileName"): return
00454         import os
00455         if not os.path.isfile(ext_file_spec):
00456             print "Cannot find script file " + str(ext_file_spec)
00457             return
00458         self.__scriptFileName = os.path.basename(ext_file_spec)
00459         int_file_spec = self.GetScriptFileSpec()
00460         print "Copying " + str(ext_file_spec) + " -> " + str(int_file_spec)
00461         if os.system("cp " + str(ext_file_spec) + " " + str(int_file_spec)):
00462             print "Failed to copy file!"
00463             self.__scriptFileName = ""
00464         self.Write()
00465 
00466     def SetScriptGlobalArgs(self,arg_str):
00467 
00468         """Set (as a string) the comma separated list of application script args that are global to all jobs."""
00469 
00470         if self.IsDisabled("SetScriptGlobalArgs"): return
00471         self.__scriptGlobalArgs = arg_str
00472         self.Write()
00473 
00474     def SubmitJobs(self,num_req = 0):
00475 
00476         """If enabled (see EnableSubmit()),
00477         submit up to the limit (see GetMaxSubmitJobs()) jobs,
00478         but not to exceed the maximum (see GetMaxGangaJobs())
00479         first choosing ones scheduled for retry and
00480         then new jobs.  Function returns number submitted."""
00481 
00482         num_sub = 0
00483 
00484         # Begin by making sure that submit is authorised, enabled and that there is a global script.
00485         if not self.IsAuthorisedToSubmit(): return num_sub
00486         if not self.__submitEnabled:
00487             print "Sorry, cannot submit; submit not enabled."
00488             return num_sub
00489         if not self.__scriptFileName:
00490             print "Sorry, cannot submit; no user application script assigned."
00491             return num_sub
00492         
00493         # Make sure all jobs are up to date and that limits are not exceeded.
00494         self.UpdateJobsStatus()
00495         if self.__maxGangaJobs <= self.__jstatSubmitAll:
00496             print "Sorry, cannot submit; already have %d (limit %d)" % (self.__jstatSubmitAll,self.__maxGangaJobs)
00497             return num_sub
00498         if num_req <= 0: num_req = self.__maxSubmitJobs
00499         max_job = min(num_req,self.__maxGangaJobs - self.__jstatSubmitAll)
00500         if max_job < num_req:  print "Sorry, can only submit %d, close to the limit of %d" % (max_job,self.__maxGangaJobs)
00501 
00502         # First look for retries and then look for new jobs.
00503         for jsc in [GID_JSC_RETRY,GID_JSC_NEW]:
00504             job_names = self._jobManagers.keys()
00505             job_names.sort()
00506             for job_name in job_names:
00507                 job = self._jobManagers[job_name]
00508                 if job.GetStatusCode() != jsc: continue
00509                 job.Submit()
00510                 num_sub += 1
00511                 if num_sub >= max_job: return num_sub
00512         return num_sub
00513 
00514     def UpdateJobsStatus(self):
00515 
00516         """Send all jobs their UpdateStatus().  May involve checking Ganga job and retrieving output.
00517 
00518         As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder
00519         but not owned by any Job"""
00520 
00521         # If job polling not enabled, perform a single sweep with a timeout based on the number
00522         # of Ganga jobs
00523         if not Ganga.GPI.config['PollThread']['autostart']:
00524             timeout = 60 + 60*self.__jstatSubmitAll/int(GetConfigValue("GangaRegistryPollRate"))
00525             Ganga.GPI.runMonitoring(1,timeout)
00526             
00527         owned_ganga_ids = {}
00528         for job_name,job in self._jobManagers.iteritems():
00529             job.UpdateStatus()
00530             owned_ganga_ids[job.GetGangaJobId()] = 1
00531         # Clean up dead links.  This ought not to be necessary but, at least as late as 4.4.8 it is.
00532         Ganga.GPI.jobtree.cleanlinks()
00533         for gj in Ganga.GPI.jobtree.getjobs(self.__gangaTreeDir):
00534             if owned_ganga_ids.has_key(gj.id): continue
00535             Log(self,logger.WARNING,"Removing orphaned Ganga job %d" % gj.id)
00536             gj.remove()
00537             
00538 #-----------------------------------------------------------------------------------------------
00539 
00540     def WarnLowGridProxy(self,email_list,proxy_min_hours=3.,myproxy_min_days=3.):
00541         """If there is a valid GRID proxy, but it is running low:-
00542 
00543               1)  'proxy_min_hours' defined and Proxy lifetime is less than 'proxy_min_hours'
00544            or 2)  'myproxy_min_days defined and myproxy lifetime is less that 'myproxy_min_days'
00545 """
00546         # No action if there isn't an email list or a GRID Proxy
00547         if not email_list or os.system("voms-proxy-info --exists"): return
00548 
00549         proxy_warn = ""
00550 
00551         # Test GRID proxy 
00552         if proxy_min_hours:
00553             inp  = os.popen("voms-proxy-info --timeleft","r")
00554             line = inp.readline()
00555             inp.close()
00556             try:
00557                 hours_left = int(line)/(60.*60.)
00558                 if hours_left < proxy_min_hours: proxy_warn = "GRID proxy remaining lifetime = %.1f hours " % hours_left
00559             except: pass
00560 
00561         # Test myproxy
00562         if myproxy_min_days:
00563             inp  = os.popen("myproxy-info | grep timeleft","r")
00564             line = inp.readline()
00565             inp.close()
00566             mo = re.search(r"\s+(\d+):(\d+):(\d+)",line)
00567             if mo:
00568                 days_left = ( (int(mo.group(3))/60.+int(mo.group(2)))/60. + int(mo.group(1)) )/24.
00569                 if days_left < myproxy_min_days: proxy_warn += "myproxy remaining lifetime = %.1f days" % days_left
00570 
00571         if proxy_warn:
00572             cmd  = 'mail -s"Task:%s WARNING: %s"  %s<< EOD\n' % (self.GetName(),proxy_warn,email_list)
00573             cmd += "\nPlease log in and renew the proxy/myproxy as appropriate.\nEOD\n"
00574             os.system(cmd)
00575  
00576 
00577     def WriteHtmlReport(self,task_dir):
00578         """Write HTML status report <task_dir>/<task-name>.html.
00579 
00580         Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html."""
00581 
00582         if not os.path.isdir(task_dir):
00583             print "Unable to find directory: " + str(task_dir)
00584             return
00585         task_name = self.GetName()
00586 
00587         #  Create or wipe directory for job reports if necessary
00588         job_dir = task_dir + "/" + task_name
00589         if not os.path.isdir(job_dir):
00590             try:
00591                 os.mkdir(job_dir,0755)
00592             except OSError:
00593                 print "Unable to create directory: " + str(job_dir)
00594                 return
00595         else:
00596             os.system("cd " + job_dir + "; rm -f *")
00597 
00598         #  Make sure that job statistics are right up to date
00599         self.RefreshJobStats()
00600         
00601         #  Create file for task report and record task data
00602         task_file = task_dir + "/" + task_name + '.html'
00603         try:
00604             f_task = open(task_file, 'w')
00605         except OSError:
00606                 print "Unable to create file: " + str(task_file)
00607                 return
00608         f_task.write("<html><body>\n")
00609         f_task.write("<h1>Status report for Task " + task_name + " produced on " +  GBSTimeStamp() + "</h1>\n<pre>\n")
00610         f_task.write(str(self))
00611         f_task.write("\n</pre>\n")
00612 
00613         #  Define colour codes
00614         red        =  "#ff6666"
00615         green      =  "#66ff66"
00616         blue       =  "#66aaff"
00617         purple     = "#ff44ff"
00618         grey_light = "#999999"
00619         grey_dark  = "#777777"
00620 
00621         #  Construct the colour table with links to start points in table sorted by job phase.
00622         f_task.write("<h2>Task Status Table</h2>\n")
00623         f_task.write("<p><table border=1>\n")
00624 
00625         f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_light + ">Not ready HELD (")
00626         num_nready_held =  self.__jstatNReadyAll - self.__jstatNReadyOther
00627         if num_nready_held:
00628             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_HOLD,num_nready_held))
00629         else: f_task.write('0)\n')
00630         
00631         f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_dark + ">Not ready Not HELD (")
00632         if self.__jstatNReadyOther:
00633             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_NHOLD,self.__jstatNReadyOther))
00634         else: f_task.write('0)\n')
00635 
00636         f_task.write("<tr><td rowspan=2 align=center bgcolor=" + blue + \
00637                      ">Ready (" + str(self.__jstatReadyAll) + ")\n")
00638 
00639         f_task.write("<td align=center bgcolor=" + green + ">First attempt (")
00640         num_ready_nretry = self.__jstatReadyAll - self.__jstatReadyRetry
00641         if num_ready_nretry:
00642             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_NRETRY,num_ready_nretry))
00643         else: f_task.write('0)\n')
00644 
00645         f_task.write("<tr><td align=center bgcolor=" + red + ">Retry (")
00646         if self.__jstatReadyRetry:
00647             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_RETRY,self.__jstatReadyRetry))
00648         else: f_task.write('0)\n')
00649         
00650         f_task.write("<tr><td rowspan=2 align=center bgcolor=" + purple + \
00651                      ">Submitted (" + str(self.__jstatSubmitAll) + ")\n")
00652         
00653         f_task.write("<td align=center bgcolor=" + green + ">Running (")
00654         num_submit_run = self.__jstatSubmitAll - self.__jstatSubmitNRun
00655         if num_submit_run:
00656             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_RUN,num_submit_run))
00657         else: f_task.write('0)\n')
00658         
00659         f_task.write("<tr><td align=center bgcolor=" + red + ">Not running (\n")
00660         if self.__jstatSubmitNRun:
00661             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_NRUN,self.__jstatSubmitNRun))
00662         else: f_task.write('0)\n')
00663         
00664         f_task.write("<tr><td colspan=2 align=center bgcolor=" + green + ">Succeeded (")
00665         num_done_nfail = self.__jstatDoneAll  - self.__jstatDoneFail 
00666         if num_done_nfail:
00667             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_NFAIL,num_done_nfail))
00668         else: f_task.write('0)\n')
00669         
00670         f_task.write("<tr><td colspan=2 align=center bgcolor=" + red + ">Failed (")
00671         if self.__jstatDoneFail:
00672             f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_FAIL,self.__jstatDoneFail))
00673         else: f_task.write('0)\n')
00674         
00675         f_task.write("</table>\n")
00676 
00677         # Prepare a hash of lists: job phase -> list of table rows
00678         phase_rows = {}
00679         phase_rows[GID_JPC_NREADY_HOLD]  = []
00680         phase_rows[GID_JPC_NREADY_NHOLD] = []
00681         phase_rows[GID_JPC_READY_NRETRY] = []
00682         phase_rows[GID_JPC_READY_RETRY]  = []
00683         phase_rows[GID_JPC_SUBMIT_RUN]   = []
00684         phase_rows[GID_JPC_SUBMIT_NRUN]  = []
00685         phase_rows[GID_JPC_DONE_NFAIL]   = []
00686         phase_rows[GID_JPC_DONE_FAIL]    = []
00687         
00688         ##  Loop over all jobs, recording summary in a table in the parent task file with
00689         ##  links to a file holding job details and associate Ganga job (if any).
00690         f_task.write('<h2><a name="table_by_name">Job Table ordered by Name</a></h2\n')
00691         f_task.write('<p>See also <a href="#table_by_phase">Job Table ordered by Phase</a><p\n')
00692         table_header = "<p><table border=1>\n<tr><th>Job Name<th>Status<br>Date&nbsp;Time<th>Ganga<br>Job<th>Try<br>Num<th>Status\n"
00693         f_task.write(table_header)
00694         job_names = self._jobManagers.keys()
00695         job_names.sort()
00696         for job_name in job_names:
00697             job = self._jobManagers[job_name]
00698             job_name = job.GetName()
00699             
00700             #  Decide colours for table entry
00701             pc = job.GetPhaseCode()
00702             colour_1,colour_2 = grey_dark, grey_dark
00703             if  pc ==  GID_JPC_NREADY_HOLD:  colour_1,colour_2 = grey_light,grey_light
00704             if  pc ==  GID_JPC_NREADY_NHOLD: colour_1,colour_2 = grey_dark, grey_dark
00705             if  pc ==  GID_JPC_READY_NRETRY: colour_1,colour_2 = blue,      green
00706             if  pc ==  GID_JPC_READY_RETRY:  colour_1,colour_2 = blue,      red
00707             if  pc ==  GID_JPC_SUBMIT_RUN:   colour_1,colour_2 = purple,    green
00708             if  pc ==  GID_JPC_SUBMIT_NRUN:  colour_1,colour_2 = purple,    red
00709             if  pc ==  GID_JPC_DONE_NFAIL:   colour_1,colour_2 = green,     green
00710             if  pc ==  GID_JPC_DONE_FAIL:    colour_1,colour_2 = red,       red
00711 
00712             # Initialise string that will hold a single row
00713             table_row = ""
00714 
00715             #  Create job file and link table entry to it.
00716             job_file = job_dir + "/" + job_name + '.html'
00717             f_job = open(job_file, 'w')
00718             f_job.write("<html><body>\n")
00719             f_job.write("<h1>Status report for Job " + job_name + " produced on " +  GBSTimeStamp() + "</h1>\n")
00720             f_job.write("<p>This job is part of the <a href=\"../" + task_name + ".html\">")
00721             f_job.write(task_name + "</a> task<p>\n<pre>\n")
00722             f_job.write(str(job))
00723             f_job.write("</pre></body></html>\n")
00724             f_job.close()
00725             table_row += "<tr><td bgcolor=" + colour_1 + "><a href=\"" + task_name + "/" \
00726                          + job_name + ".html\">" + job_name + "</a>"
00727             
00728             # Chop off year and second to keep date more compact
00729             table_row += "<td bgcolor=" + colour_1 + ">" + job.GetStatusTime()[6:-4]
00730             table_row += "<td bgcolor=" + colour_1 + ">"
00731             gj_id        = "unknown"
00732             gj_stat_file = ""
00733             gj_obj       = job.GetGangaJob()
00734             if gj_obj:
00735                 gj_id =  gj_obj.id
00736             else :
00737                 gj_stat_file = job._GetTryOutputDir() + "/gbs_ganga.status"
00738                 if os.path.isfile(gj_stat_file):
00739                     f = os.popen('grep "id " ' + gj_stat_file)
00740                     line = f.readline()
00741                     f.close()
00742                     mo = re.search(r"id = (\d+)",line)
00743                     if mo: gj_id = mo.group(1)
00744                     gj_obj = open(gj_stat_file)
00745             if not gj_obj:
00746                 table_row += "n/a"
00747             else:
00748                 
00749                 # Create Ganga job file
00750                 gj_file = job_dir + "/"  + job_name + '_ganga_job.html'
00751                 f_gj = open(gj_file, 'w')
00752                 f_gj.write("<html><body>\n")
00753                 f_gj.write("<p><h1>Ganga job for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n")
00754                 f_gj.write("<p>This job is part of the <a href=\"../" + task_name + ".html>")
00755                 f_gj.write(task_name + "</a> task<p>\n<pre>\n")
00756                 if gj_stat_file:
00757                     f_gj.write("Dump of expired Ganga Job\n\n")
00758                     for line in gj_obj: f_gj.write(line)
00759                     gj_obj.close()
00760                 else:
00761                     f_gj.write("Dump of active Ganga Job\n\n")
00762                     f_gj.write(str(gj_obj))
00763                 f_gj.write("</pre></body></html>\n")
00764                 f_gj.close()
00765                 table_row += "<a href=\"" +  task_name + "/" + job_name + "_ganga_job.html\">" +  str(gj_id) + "</a>"
00766 
00767             # Create a GLF history file
00768             hist_file = job_dir + "/"  + job_name + '_history.html'
00769             f_hist = open(hist_file, 'w')
00770             f_hist.write("<html><body>\n")
00771             f_hist.write("<p><h1>Full try history for <a href=\"" + job_name + ".html\">" + job_name + "</a></h1>\n")
00772             f_hist.write("This file contains all the GBS Log Files (GLF) for "\
00773                          + job_name + " which is is part of the <a href=\"../" + task_name + ".html\">")
00774             f_hist.write(task_name + "</a> task<p>\n<pre>\n")
00775             glf_wildcard = job.GetStoreLocation('child_dir') + "/*/gbs_*job*.log"
00776             for glf_file in glob.glob(glf_wildcard):
00777                 mo = re.search(r"_(\d+)\.log$",glf_file)
00778                 f_hist.write("</pre><h2>GLF Summary for %s</h2><pre>\n" % mo.group(1))
00779                 f_glf = open(glf_file)
00780                 f_hist.write(f_glf.read())
00781                 f_glf.close()
00782             f_hist.write("</pre></body></html>\n")
00783             f_hist.close()
00784 
00785             hist_file_rel =  task_name + "/" + job_name + "_history.html"
00786             table_row += "<td bgcolor=%s align=center> <a href=\"%s\">%d</a>" % (colour_2,hist_file_rel,job.GetTryNumber())
00787             table_row += "<td bgcolor=" + colour_2 + ">" + GIDStringForJSC(job.GetStatusCode()).ljust(23) \
00788                          + "[" + job.GetStatusText() + "]\n"
00789             # Record row in table ordered by name and and in hash ready forr table ordered by phase.
00790             f_task.write(table_row)
00791             phase_rows[pc].append(table_row)
00792 
00793         f_task.write("</table>\n")
00794 
00795         ##  Construct table ordered by phase
00796         f_task.write('<h2><a name="table_by_phase">Job Table ordered by Phase</a></h2>\n')
00797         f_task.write('<p>See also <a href="#table_by_name">Job Table ordered by Name</a><p>\n')
00798         f_task.write(table_header)
00799         for pc,rows in phase_rows.iteritems():
00800             f_task.write('<a name="phase_%d"></a>\n' % pc)
00801             for row in rows: f_task.write(row)
00802         f_task.write("</table>\n")
00803         f_task.write("</body></html>\n")
00804         f_task.close()
00805         print "HTML Status report written to " + task_file
00806         
00807                 
00808 
00809 #-----------------------------------------------------------------------------------------------
00810 #
00811 #  ProtoJob Methods (user callable)
00812 #
00813 #-----------------------------------------------------------------------------------------------
00814 
00815     
00816     def AddProtoJob(self,job_name,args_str="",env_str=""):
00817 
00818         """Add a ProtJob with the name job_name and assign its local args the string args_str
00819       and environment env_str."""
00820 
00821         if env_str:
00822             d =  GBSUtilities.ParseEnvStr(env_str)
00823             if not d:
00824                 print "Cannot parse environment string: " + env_str
00825                 return None
00826         return self._AddJobOrProtoJob(job_name,args_str,env_str,"GBSProtoJob")
00827 
00828     def PromoteProtoJobs(self,job_name_pattern = ".*"):
00829 
00830         """Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all."""
00831 
00832         pro_list = [] # List of names to be promoted (give user a last chance to abort)
00833         for job_name,job in self._jobManagers.iteritems():
00834             if job.GetType() != "GBSProtoJob": continue
00835             if re.search(job_name_pattern,job_name): pro_list.append(job_name)
00836         num_pjobs = len(pro_list)
00837         if not num_pjobs:
00838             print "No ProtoJobs to promote"
00839             return
00840         ans = raw_input(str(num_pjobs) + " ProtoJobs are due for promotion.  Do you want to proceed ?y[n]")
00841         if not re.search(r"^y",ans,re.I):
00842             print "Promotion aborted"
00843             return
00844         for job_name in pro_list:
00845             pjob = self._jobManagers[job_name]
00846             job  = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self)
00847             job.SetScriptLocalArgs(pjob.GetScriptLocalArgs())
00848             job.SetLocalEnvironment(pjob.GetLocalEnvironment())
00849             job.SetLocalInputSandbox(pjob.GetLocalInputSandbox())
00850             job.SetLocalOutputSandbox(pjob.GetLocalOutputSandbox())
00851             self._jobManagers[job_name] = job
00852         print str(len(pro_list)) + " ProtoJobs promoted"
00853        
00854     def RemoveProtoJobs(self,job_name_pattern = ".*"):
00855 
00856         """Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all"""
00857 
00858         del_list = [] # List of names to be deleted (cannot delete elements while iterating)
00859         for job_name,job in self._jobManagers.iteritems():
00860             if job.GetType() != "GBSProtoJob": continue
00861             if re.search(job_name_pattern,job_name): del_list.append(job_name)
00862         for job_name in del_list: del self._jobManagers[job_name]
00863         print str(len(del_list)) + " ProtoJobs deleted"
00864        
00865                 
00866 
00867 #-----------------------------------------------------------------------------------------------
00868 #
00869 #  Private Methods (not user callable)
00870 #
00871 #-----------------------------------------------------------------------------------------------
00872 
00873                       
00874     def _AddJobOrProtoJob(self,job_name,args_str,env_str,type):
00875 
00876         """Create a new named Job or ProtoJob and optionally assign its application script local args."""
00877 
00878         #  Check that the name is legal and not already in use.
00879         if re.search(r"[^a-zA-Z0-9_\-]",job_name):
00880             print "Sorry, '" + str(job_name) + "' is an illegal job name (characters other than alphanumeric, '_' and '-')"
00881             return None 
00882         if not re.search(r"^job_",job_name): job_name = 'job_' + job_name
00883         if self._jobManagers.has_key(job_name):
00884             print "Sorry, there already is a Job named '" + str(job_name) + "'"
00885             return None
00886 
00887         # Name O.K., go create the Job.
00888         job = None
00889         if ( type == "GBSProtoJob" ): job = GBSProtoJob(job_name)
00890         else:                         job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self)
00891         if args_str: job.SetScriptLocalArgs(args_str)
00892         if env_str:  job.SetLocalEnvironment(env_str)
00893         self._jobManagers[job_name] = job
00894         return job
00895 
00896     def __ApplyActionToJobs(self,action,job_name_pattern):
00897 
00898         """Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern"""
00899         job_list = [] # List of job names to which action can be applied
00900         for job_name,job in self._jobManagers.iteritems():
00901             if not re.search(job_name_pattern,job_name): continue
00902             if    ( action == "CLEAR ERROR COUNTS" and job.CanClear() )  \
00903                or ( action == "CLEAR HISTORY"      and job.CanClear() )  \
00904                or ( action == "HOLD"               and job.IsReady() )   \
00905                or ( action == "KILL"               and job.CanKill() )   \
00906                or ( action == "RELEASE"            and job.IsHeld() )    \
00907                or ( action == "REMOVE"             and job.CanClear() ):
00908                 job_list.append(job_name)
00909         num_pjobs = len(job_list)
00910         if not num_pjobs:
00911             print "No jobs suitable for action " + action
00912             return
00913         print str(num_pjobs) + " jobs suitable for action " + action + ":-"
00914         for job_name in job_list: print "  " + job_name
00915         ans = raw_input("Do you want to proceed ?y[n]")
00916         if not re.search(r"^y",ans,re.I):
00917             print "Action " + action + " aborted"
00918             return
00919         if action == "CLEAR HISTORY" or action == "REMOVE":
00920             warn = "this wipes all output for the job(s)"
00921             if action == "REMOVE": warn = "this entirely removes the job(s)"
00922             ans = raw_input("Are you REALLY sure - " + warn + "! ?y[n]")
00923             if not re.search(r"^y",ans,re.I):
00924                 print "Action " + action + " aborted"
00925                 return
00926         for job_name in job_list:
00927             job = self._jobManagers[job_name]
00928             if action == "CLEAR ERROR COUNTS": job.ClearErrorCounts()
00929             if action == "CLEAR HISTORY":      job.ClearHistory(False)
00930             if action == "HOLD":               job.Hold()
00931             if action == "KILL":               job.Kill()
00932             if action == "RELEASE":            job.Release()
00933             if action == "REMOVE":             job.Remove(False)
00934         print "Action " + action + " applied to " + str(len(job_list)) + " jobs"
00935 
00936 
00937     def __ReloadChildren(self):
00938 
00939         """Reload child Jobs from disk.
00940 
00941         Loop over all job_*.state files and recreate. 
00942         Make sure any associated Ganga job is in the associated JobTree directory"""
00943 
00944         import os, re
00945 
00946         child_dir         = self.GetStoreLocation("child_dir")
00947         child_state_files = os.listdir(child_dir)
00948         for child_state_file in child_state_files:
00949 
00950             #  Only look at job_*.state files
00951             child_state_file_spec = child_dir + "/" + child_state_file
00952             if not os.path.isfile(child_state_file_spec): continue
00953             mo = re.search(r"^(job_.*)\.state$",child_state_file)
00954             if not mo: continue
00955             child_name = mo.group(1)
00956             job = self.AddJob(child_name)
00957             # Ignore errors while trying to move associated Ganga job into correct JobTree directory
00958             # There may not be any Ganga job
00959             try:
00960                 g_job = job.GetGangaJob()
00961                 Ganga.GPI.jobtree.add(g_job,self.__gangaTreeDir)
00962             except: pass
00963             
00964 
00965 class GBSProtoJob:
00966 
00967     """A potential job made actual through the Task.PromoteProtoJob() method.
00968 
00969     Protojobs allow users to create large numbers of jobs
00970     automatically using scripts or data files and review and correct
00971     them before "promoting" them into jobs.  As such they are easy
00972     to create and destroy, are not persisted and are consequently lost at
00973     program termination, unless promoted.
00974 
00975     GBSProtoJob emulate the following small subset of a real job:-
00976 
00977     GetType()
00978       Return object type.
00979 
00980     GetScriptLocalArgs()
00981       Return (as a string) the list of application script args that are local to this job
00982 
00983     SetScriptLocalArgs(args_str)
00984       Set (as a string) the list of application script args that are local to this job
00985 
00986       e.g.  job.SetScriptLocalArgs('123 "a string with spaces" 456')
00987 
00988     """
00989 
00990     def __init__(self,name):
00991         self.__name               = name
00992         self.__scriptLocalArgs    = ""
00993         self.__localEnvironment   = ""
00994         self.__localInputSandbox  = ""
00995         self.__localOutputSandbox = ""
00996         
00997 
00998     def GetType(self): return "GBSProtoJob"
00999 
01000     def __repr__(self): return self.AsString()
01001     
01002     
01003     def AsString(self,level = "Brief"):
01004 
01005         """Return string description.
01006 
01007          Return string description at the following levels:-
01008         "Brief"    one line summary suitable for rows in tables
01009         "Heading"  one line summary suitable as heading for "Brief" """
01010 
01011         if ( level == "Heading"):
01012             s  = "Name".ljust(20)
01013             s += "Status".ljust(23)
01014             s += "Input".ljust(40)
01015             s += "Status Details"
01016             return s
01017         s  = self.__name.ljust(20)
01018         s += "(protojob)".ljust(23)
01019         s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40)
01020         return s
01021 
01022     def GetLocalEnvironment(self):
01023         """Return, as a comma separated list string, the environment that is local to this job"""
01024         return self.__localEnvironment 
01025 
01026     def GetScriptLocalArgs(self):
01027         """Return (as a string) the list of application script args that are local to this job"""
01028         return self.__scriptLocalArgs
01029     
01030     def GetLocalInputSandbox(self):
01031         """Return, as a comma separated list string, the input sandbox file list that is local to this job."""
01032         return self.__localInputSandbox
01033     
01034     def GetLocalOutputSandbox(self):
01035         """Return, as a comma separated list string, the ouput sandbox file list that is local to this job."""
01036         return self.__localOutputSandbox
01037 
01038     def SetLocalEnvironment(self,env_str):
01039         """Set, as a comma separated list string, the environment that local to this job.."""
01040         self.__localEnvironment = env_str
01041 
01042     def SetScriptLocalArgs(self,arg_str):
01043         """Set (as a string) the comma separated list of application script args that are local to this job."""
01044         self.__scriptLocalArgs  = arg_str
01045 
01046     def SetLocalInputSandbox(self,in_sbox_str):
01047         """Set, as a comma separated list string, the input sandbox file list that is local to this job"""
01048         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
01049         if ok:
01050             # Store the raw input sandbox file list; files won't get stored until promotion.
01051             self.__localInputSandbox = in_sbox_str
01052 
01053     def SetLocalOutputSandbox(self,out_sbox_str):
01054         """Set, as a comma separated list string, the output sandbox file list that is local to this job."""
01055 
01056         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
01057         if ok:
01058             self.__localOutputSandbox = str

Generated on Fri Mar 5 09:25:41 2010 for gbs by  doxygen 1.4.7