[cig-commits] r13959 - cs/portal/trunk/northridge/backend

leif at geodynamics.org leif at geodynamics.org
Mon Jan 26 19:41:13 PST 2009


Author: leif
Date: 2009-01-26 19:41:13 -0800 (Mon, 26 Jan 2009)
New Revision: 13959

Modified:
   cs/portal/trunk/northridge/backend/mineos.cfg
   cs/portal/trunk/northridge/backend/mineos.py
Log:
Revised the Mineos script to use Lonestar's undocumented
'parametric_wrapper' -- instead of 'ibrun' or 'serialrun', we use:

    pam -g 1 parametric_wrapper [command]

This sets the environment variable TACC_LAUNCHER_TSK_ID in a manner
similar to LSB_JOBINDEX, so that each process can do its own thing.

Using LSF's "job arrays" proved to be problematic.  From the start, it
was difficult to figure out when an array job had finished.  LSF's
'bsub' command doesn't support the '-K' option with array jobs.  As a
work-around, I tried scheduling a dummy "waiter" job which depends
upon the job array (using '-w').  This didn't seem to work reliably;
although, to be honest, I didn't spend any time debugging it.  It's
possible that "ended(job)" would work better than "done(job)".

In any case, Lonestar limits job arrays to 5 processors (!), which
doesn't allow for very much parallelism, to say the least.

I considered running Mineos from an MPI wrapper program, but
Lonestar's user guide cautions against using fork() in MPI programs,
due to a bug in the InfiniBand drivers.

I looked for 'shmux' (or equivalent) on Lonestar, but couldn't find
it.

I grew fearful that I would have to create an MPI version of Mineos.
But then I stumbled upon /opt/lsf/bin/parametric_wrapper, and breathed
a sign of relief.


Modified: cs/portal/trunk/northridge/backend/mineos.cfg
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.cfg	2009-01-27 02:40:33 UTC (rev 13958)
+++ cs/portal/trunk/northridge/backend/mineos.cfg	2009-01-27 03:41:13 UTC (rev 13959)
@@ -1,25 +1,22 @@
 
 [mineos]
-# mineos.py uses custom LSFJobArrayScheduler
-#scheduler = lsf
+scheduler = lsf
+launch-command = pam -g 1 parametric_wrapper
+seismo-nodes = 100    ; use multiple of 4 on Lonestar
 
-[mineos.lsf]
-max-size = 200
-slot-limit = 100
-
 [mineos.journal.info]
 lsf = true
 
 [mineos.eigen-job]
 queue = normal
-walltime = 24*hour
-stdout = eigen-%I-stdout.txt
-stderr = eigen-%I-stderr.txt
+walltime = 4*hour
+stdout = eigen-stdout.txt
+stderr = eigen-stderr.txt
 
 [mineos.seismo-job]
 queue = normal
-walltime = 10*minute
-stdout = seismo-%I-stdout.txt
-stderr = seismo-%I-stderr.txt
+walltime = 1*hour
+stdout = seismo-stdout.txt
+stderr = seismo-stderr.txt
 
 # end of file

Modified: cs/portal/trunk/northridge/backend/mineos.py
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py	2009-01-27 02:40:33 UTC (rev 13958)
+++ cs/portal/trunk/northridge/backend/mineos.py	2009-01-27 03:41:13 UTC (rev 13959)
@@ -198,57 +198,6 @@
     return tally
 
 
-class JobArray(Job):
-    import pyre.inventory as pyre
-    size = pyre.int("size")
-
-    def __init__(self):
-        super(JobArray, self).__init__()
-        self.baseArguments = []
-        self.extraArguments = []
-
-
-class LSFJobArrayScheduler(SchedulerLSF):
-
-    import pyre.inventory as pyre
-
-    maxSize = pyre.int("max-size") # MAX_JOB_ARRAY_SIZE or smaller
-    slotLimit = pyre.int("slot-limit")
-
-    # "Job array doesn't support -K option. Job not submitted."
-    wait = False
-
-    def schedule(self, jobArray):
-        
-        assert(0 < jobArray.size <= self.maxSize)
-
-        jobArrayName = jobArray.task
-        if not jobArrayName:
-            jobArrayName = "jobname"
-        if self.slotLimit:
-            jobArray.task = '"%s[1-%d]%%%d"' % (jobArrayName, jobArray.size, self.slotLimit)
-        else:
-            jobArray.task = '"%s[1-%d]"' % (jobArrayName, jobArray.size)
-
-        jobArray.arguments = jobArray.baseArguments + jobArray.extraArguments
-        
-        super(LSFJobArrayScheduler, self).schedule(jobArray)
-
-        # Since we can't use "-K" with job arrays (!@#$%*?), schedule
-        # and wait for a dummy job which depends upon the array.  XXX:
-        # This assumes the job name is unique.  We could slurp the job
-        # ID from the 'bsub' output... but could that really be
-        # considered more robust?
-        argv = ['bsub', '-q', 'serial', '-n', '1', '-J', 'waiter', '-K', '-W', '0:01',
-                '-w', 'done(%s)' % jobArrayName,
-                '/bin/true']
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-        
-        return
-
-
 class MineosScript(Script):
 
     name = "mineos"
@@ -312,11 +261,19 @@
     #------------------------------------------------------------------------
     # scheduling
 
-    scheduler      = pyre.facility("scheduler", factory=LSFJobArrayScheduler)
-    eigenJob       = pyre.facility("eigen-job", factory=JobArray)
-    seismoJob      = pyre.facility("seismo-job", factory=JobArray)
+    import pyre.schedulers as schedulers
+    
+    nodes          = pyre.int("nodes", default=1)
+    scheduler      = schedulers.scheduler("scheduler", default="none")
+    eigenJob       = schedulers.job("eigen-job")
+    seismoJob      = schedulers.job("seismo-job")
+    seismoNodes    = pyre.int("seismo-nodes")
+    launchCommand  = pyre.str("launch-command")
 
-    context        = pyre.str("context", default="login", validator=pyre.choice(["login", "computeEigen", "computeSeismograms", "monitor"]))
+    context        = pyre.str("context", default="login", validator=pyre.choice(["login",
+                                                                                 "launchEigen", "computeEigen",
+                                                                                 "launchSeismograms", "computeSeismograms",
+                                                                                 "monitor"]))
     progressUrl    = pyre.str("progress-url")
 
 
@@ -340,12 +297,13 @@
 
         # Compute eigenfunctions and eigenfrequencies.  For
         # simplicity, always schedule one job for each mode,
-        # regardless of which modes are enabled.
+        # regardless of which modes are enabled.  If mode catalogs
+        # were cached, we would likewise include all modes in every
+        # catalog, regardless of which modes are enabled for a
+        # particular run.
         job = self.prepareJob(self.eigenJob, *args, **kwds)
-        job.size = len(modes)
-        job.extraArguments = ["--context=computeEigen"]
-        quotient, remainder = self.trimJob(job)
-        assert quotient == 0
+        job.nodes = len(modes)
+        job.arguments.extend(["--context=launchEigen", "--nodes=%d" % job.nodes])
         self.scheduleJob(job)
         
         # compute seismograms
@@ -353,9 +311,13 @@
         os.chdir(archiveDirName)
         nStations = lineCount(self.stations + ".site")
         job = self.prepareJob(self.seismoJob, *args, **kwds)
-        job.size = nStations
-        job.extraArguments = ["--context=computeSeismograms"]
-        quotient, remainder = self.trimJob(job)
+        job.nodes = nStations
+        maxJobSize = self.seismoNodes
+        quotient = job.nodes / maxJobSize
+        remainder = job.nodes % maxJobSize
+        if job.nodes > maxJobSize:
+            job.nodes = maxJobSize
+        job.arguments.extend(["--context=launchSeismograms", "--nodes=%d" % job.nodes])
         self.splinterStationList(job, quotient, remainder)
         os.chdir(jobDir)
         self.scheduleJob(job)
@@ -402,7 +364,7 @@
 
         generatedFiles = []
 
-        for jobIndex in xrange(1, job.size + 1):
+        for jobIndex in xrange(1, job.nodes + 1):
             
             # Pick out this job's share of the load.
             n = quotient
@@ -525,6 +487,8 @@
         context = self.context
         if context == "login":
             self.onLoginNode(*args, **kwds)
+        elif context.startswith("launch"):
+            self.onLauncherNode(*args, **kwds)
         elif context.startswith("compute"):
             self.onCompute(*args, **kwds)
         elif context == "monitor":
@@ -553,10 +517,27 @@
         sys.exit(1)
 
 
+    def onLauncherNode(self, *args, **kwds):
+        
+        launchCommand = self.launchCommand % {'nodes': self.nodes}
+        launchCommand = launchCommand.split()
+
+        myArgv = self.getArgv(*args, **kwds)
+        context = self.context.replace("launch", "compute")
+
+        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=%s" % context, "--nodes=%d" % self.nodes]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        return
+
+
     def onCompute(self, *args, **kwds):
 
-        # This is analogous to MPI rank.
-        jobIndex = int(os.environ['LSB_JOBINDEX'])
+        # Obtain the analogue to MPI rank.
+        # See /opt/lsf/bin/parametric_wrapper on Lonestar.
+        jobIndex = int(os.environ['TACC_LAUNCHER_TSK_ID']) + 1 # was LSB_JOBINDEX
 
         context = self.context
         if context == "computeEigen":
@@ -580,20 +561,12 @@
     def prepareJob(self, job, *args, **kwds):
         job.nodes = 1
         job.executable = sys.executable
-        job.baseArguments = [__file__] + self.getArgv(*args, **kwds)
+        job.arguments = [__file__] + self.getArgv(*args, **kwds)
         return job
 
 
-    def trimJob(self, job):
-        maxSize = self.scheduler.maxSize
-        quotient = job.size / maxSize
-        remainder = job.size % maxSize
-        if job.size > maxSize:
-            job.size = maxSize
-        return quotient, remainder
-
-        
     def scheduleJob(self, job):
+        self.scheduler.wait = True
         self.scheduler.schedule(job)
         return
 



More information about the CIG-COMMITS mailing list