[cig-commits] r13865 - cs/portal/trunk/northridge/backend

leif at geodynamics.org leif at geodynamics.org
Tue Jan 13 13:53:05 PST 2009


Author: leif
Date: 2009-01-13 13:53:04 -0800 (Tue, 13 Jan 2009)
New Revision: 13865

Added:
   cs/portal/trunk/northridge/backend/mineos.cfg
Modified:
   cs/portal/trunk/northridge/backend/mineos.py
Log:
Reworked the 'mineos.py' backend script so that it runs the Mineos
program suite in parallel using LSF's "job array" feature.

This code mostly works, but there is a numbering bug somewhere.  In a
test run, 55/124 green+syndat jobs failed with "wfdisc relation
green-0102 appears to be empty, or does not exist" -- and indeed it
doesn't: job #102 outputs green-0099?!

Also, on CITerra, computation of the spheroidal portion of the mode
catalog takes forever to compute (24 hours was not enough).  Here,
Ibrix is presumed guilty until proven otherwise.


Added: cs/portal/trunk/northridge/backend/mineos.cfg
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.cfg	                        (rev 0)
+++ cs/portal/trunk/northridge/backend/mineos.cfg	2009-01-13 21:53:04 UTC (rev 13865)
@@ -0,0 +1,25 @@
+
+[mineos]
+# mineos.py uses custom LSFJobArrayScheduler
+#scheduler = lsf
+
+[mineos.lsf]
+max-size = 200
+slot-limit = 100
+
+[mineos.journal.info]
+lsf = true
+
+[mineos.eigen-job]
+queue = normal
+walltime = 24*hour
+stdout = eigen-%I-stdout.txt
+stderr = eigen-%I-stderr.txt
+
+[mineos.seismo-job]
+queue = normal
+walltime = 10*minute
+stdout = seismo-%I-stdout.txt
+stderr = seismo-%I-stderr.txt
+
+# end of file

Modified: cs/portal/trunk/northridge/backend/mineos.py
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py	2009-01-13 11:12:11 UTC (rev 13864)
+++ cs/portal/trunk/northridge/backend/mineos.py	2009-01-13 21:53:04 UTC (rev 13865)
@@ -2,11 +2,15 @@
 
 
 from pyre.applications import Script
+from pyre.components import Component
+from pyre.schedulers import Job, SchedulerLSF
 from pyre.units.SI import milli, hertz
 from pyre.units.length import km
-import os, sys, stat, popen2, time
-from os.path import basename, dirname, splitext
+from os.path import exists, basename, dirname, join, splitext
+from time import sleep
 
+import os, sys, stat, popen2, tarfile, time
+
 mHz = milli*hertz
 
 # Assume this script is installed alongside the mineos binaries.
@@ -16,6 +20,10 @@
 mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
 
 
+archiveDirName = "mineos"
+archive = archiveDirName + ".tar.gz"
+
+
 class Mode(object):
     def __init__(self, jcom, id, desc):
         self.jcom = jcom
@@ -34,14 +42,13 @@
 
 class Mineos(object):
 
-    def __init__(self, model, event, stations, progressUrl):
+    def __init__(self, model, event, stations):
         self.bin = prefix + "/bin"
         self.model = model
         self.event = event
         self.stations = stations
-        self.progressUrl = progressUrl
-        self.eigcon_out = []
-        self.green_out = "green"
+        self.green_out = None
+        self.syndat_out = None
         return
 
 
@@ -86,39 +93,24 @@
         print >>s, dbname
         s.close()
         self.run("eigcon", stdin=eigcon_in)
-        self.eigcon_out.append(dbname)
         return
 
 
-    def green(self, fmin, fmax, nsamples):
-        nStations = self.lineCount(self.stations + ".site")
+    def green(self, fmin, fmax, nsamples, db_list):
         s = open("db_list", "w")
-        for dbname in self.eigcon_out:
+        for dbname in db_list:
             print >>s, dbname
-        green_in = open("green.in", "w")
-        argv = [os.path.join(self.bin, "green")]
-        child = popen2.Popen4(argv)
-        for s in [green_in, child.tochild]:
-            print >>s, self.stations
-            print >>s, "db_list"
-            print >>s, self.event
-            print >>s, fmin, fmax
-            print >>s, nsamples
-            print >>s, self.green_out
-            s.close()
-        stationTally = 0
-        self.postProgress(0.0)
-        lastPostTime = time.time()
-        for line in child.fromchild:
-            print line,
-            if line.startswith(" green: Station: "):
-                stationTally += 1
-                now = time.time()
-                if now - lastPostTime >= 60.0:
-                    self.postProgress(float(stationTally) / float(nStations))
-                    lastPostTime = now
-        status = child.wait()
-        self.checkStatus(status, argv)
+        s.close()
+        green_in = "green.in"
+        s = open(green_in, "w")
+        print >>s, self.stations
+        print >>s, "db_list"
+        print >>s, self.event
+        print >>s, fmin, fmax
+        print >>s, nsamples
+        print >>s, self.green_out
+        s.close()
+        self.run("green", stdin=green_in)
         return
 
 
@@ -132,7 +124,7 @@
         print >>s, self.event
         print >>s, plane # error in documentation!
         print >>s, self.green_out
-        print >>s, "seismograms" #"syndat_out"
+        print >>s, self.syndat_out
         print >>s, datatype
         s.close()
         self.run("syndat", stdin=syndat_in)
@@ -174,7 +166,7 @@
             if stderr is not None:
                 fd = os.open(stderr, os.O_WRONLY, mode)
                 os.dup2(fd, 2)
-            executable = os.path.join(self.bin, argv[0])
+            executable = join(self.bin, argv[0])
             try:
                 os.execvp(executable, argv)
             except Exception, e:
@@ -198,36 +190,65 @@
         return
 
 
-    def lineCount(self, filename):
-        tally = 0
-        s = open(filename, 'r')
-        for line in s:
-            tally += 1
-        return tally
+def lineCount(filename):
+    tally = 0
+    s = open(filename, 'r')
+    for line in s:
+        tally += 1
+    return tally
 
 
-    def postProgress(self, progress):
-        import urllib, urlparse
-        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
-        fields = { 'progress': "%.2f" % progress }
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        import httplib
-        if scheme == "http":
-            conn = httplib.HTTPConnection(host)
-        elif scheme == "https":
-            conn = httplib.HTTPSConnection(host)
+class JobArray(Job):
+    import pyre.inventory as pyre
+    size = pyre.int("size")
+
+    def __init__(self):
+        super(JobArray, self).__init__()
+        self.baseArguments = []
+        self.extraArguments = []
+
+
+class LSFJobArrayScheduler(SchedulerLSF):
+
+    import pyre.inventory as pyre
+
+    maxSize = pyre.int("max-size") # MAX_JOB_ARRAY_SIZE or smaller
+    slotLimit = pyre.int("slot-limit")
+
+    # "Job array doesn't support -K option. Job not submitted."
+    wait = False
+
+    def schedule(self, jobArray):
+        
+        assert(0 < jobArray.size <= self.maxSize)
+
+        jobArrayName = jobArray.task
+        if not jobArrayName:
+            jobArrayName = "jobname"
+        if self.slotLimit:
+            jobArray.task = '"%s[1-%d]%%%d"' % (jobArrayName, jobArray.size, self.slotLimit)
         else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", path, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        return data
+            jobArray.task = '"%s[1-%d]"' % (jobArrayName, jobArray.size)
 
+        jobArray.arguments = jobArray.baseArguments + jobArray.extraArguments
+        
+        super(LSFJobArrayScheduler, self).schedule(jobArray)
 
+        # Since we can't use "-K" with job arrays (!@#$%*?), schedule
+        # and wait for a dummy job which depends upon the array.  XXX:
+        # This assumes the job name is unique.  We could slurp the job
+        # ID from the 'bsub' output... but could that really be
+        # considered more robust?
+        argv = ['bsub', '-J', 'waiter', '-K', '-W', '0:01',
+                '-w', 'done(%s)' % jobArrayName,
+                '/bin/true']
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+        
+        return
 
+
 class MineosScript(Script):
 
     name = "mineos"
@@ -280,81 +301,193 @@
 
     stations = pyre.str("stations")
 
+
     #------------------------------------------------------------------------
     # syndat
     
     plane = pyre.int("plane", default=0) # use CMT tensor components as-is
     datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
 
+
     #------------------------------------------------------------------------
-    # misc.
+    # scheduling
 
-    progressUrl = pyre.str("progress-url")
+    scheduler      = pyre.facility("scheduler", factory=LSFJobArrayScheduler)
+    eigenJob       = pyre.facility("eigen-job", factory=JobArray)
+    seismoJob      = pyre.facility("seismo-job", factory=JobArray)
 
+    context        = pyre.str("context", default="login", validator=pyre.choice(["login", "computeEigen", "computeSeismograms", "monitor"]))
+    progressUrl    = pyre.str("progress-url")
 
-    def main(self, *args, **kwds):
-        
-        self._info.activate()
 
-        enabledModes = []
-        for mode in modes:
-            if getattr(self, mode.id):
-                enabledModes.append(mode)
+    #------------------------------------------------------------------------
+    # running
 
+    def runMineosPrograms(self, *args, **kwds):
+
+        enabledModes = self.enabledModes()
         self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
         if not enabledModes:
             self._info.log("nothing to do")
             return
 
-        jobdir = os.getcwd()
-        
-        archiveDirName = "mineos"
         os.mkdir(archiveDirName)
         inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
         for inputFile in inputFiles:
-            os.rename(inputFile, os.path.join(archiveDirName, inputFile))
-        os.chdir(archiveDirName)
+            os.rename(inputFile, join(archiveDirName, inputFile))
         
         self._info.log("running Mineos program suite for model '%s'" % self.model)
-        self.runMineos(self.model, enabledModes)
 
+        # Compute eigenfunctions and eigenfrequencies.  For
+        # simplicity, always schedule one job for each mode,
+        # regardless of which modes are enabled.
+        job = self.prepareJob(self.eigenJob, *args, **kwds)
+        job.size = len(modes)
+        job.extraArguments = ["--context=computeEigen"]
+        quotient, remainder = self.trimJob(job)
+        assert quotient == 0
+        self.scheduleJob(job)
+        
+        # compute seismograms
+        jobDir = os.getcwd()
+        os.chdir(archiveDirName)
+        nStations = lineCount(self.stations + ".site")
+        job = self.prepareJob(self.seismoJob, *args, **kwds)
+        job.size = nStations
+        job.extraArguments = ["--context=computeSeismograms"]
+        quotient, remainder = self.trimJob(job)
+        self.splinterStationList(job, quotient, remainder)
+        os.chdir(jobDir)
+        self.scheduleJob(job)
+
         # archive the output
-        os.chdir(jobdir)
-        self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
+        self.spawn("/bin/tar", "cvzf", "_" + archive, archiveDirName)
 
         # clean up
         self.spawn("/bin/rm", "-rf", archiveDirName)
 
+        # The existence of the latter signals that we are done.  See
+        # onMonitor().
+        os.rename("_" + archive, archive)
+
         return
 
 
-    def runMineos(self, model, enabledModes):
-        mineos = Mineos(model, self.event, self.stations, self.progressUrl)
+    def enabledModes(self):
+        enabledModes = []
+        for mode in modes:
+            if getattr(self, mode.id):
+                enabledModes.append(mode)
+        return enabledModes
 
+
+    def splinterStationList(self, job, quotient, remainder):
+
+        stations = self.stations
+
+        # Read the entire contents of the master ".site" and
+        # ".sitechan" files.
+
+        s = open(stations + ".site", "r")
+        siteList = [site for site in s]
+        s.close()
+
+        sitechan = {}
+        s = open(stations + ".sitechan", "r")
+        for chan in s:
+            name = chan.split()[0]
+            cl = sitechan.setdefault(name, [])
+            cl.append(chan)
+        s.close()
+
+        generatedFiles = []
+
+        for jobIndex in xrange(1, job.size + 1):
+            
+            # Pick out this job's share of the load.
+            n = quotient
+            if jobIndex < remainder + 1:
+                n += 1
+            l = siteList[0:n]
+            siteList = siteList[n:]
+
+            # Generate this job's ".site" file.
+            filename = self.indexedStationsName(jobIndex) + ".site"
+            generatedFiles.append(filename)
+            s = open(filename, "w")
+            for site in l:
+                s.write(site)
+            s.close()
+            
+            # Generate this job's ".sitechan" file.
+            filename = self.indexedStationsName(jobIndex) + ".sitechan"
+            generatedFiles.append(filename)
+            s = open(filename, "w")
+            for site in l:
+                name = site.split()[0]
+                for chan in sitechan[name]:
+                    s.write(chan)
+            s.close()
+        
+        return generatedFiles
+
+
+    def indexedName(self, base, jobIndex):
+        return "%s-%04d" % (base, jobIndex)
+
+    def indexedStationsName(self, jobIndex):
+        return self.indexedName(self.stations, jobIndex)
+
+
+    def computeEigen(self, jobIndex):
+        os.chdir(archiveDirName)
+        self.redirectOutput(self.indexedName("output_eigen", jobIndex) + ".txt")
+
+        mineos = Mineos(self.model, self.event, self.stations)
+
+        mode = modes[jobIndex - 1]
+        if not getattr(self, mode.id):
+            self._info.log("mode '%s' is disabled" % mode.desc)
+            return
+        
         # minos_bran
-        for mode in enabledModes:
-            self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
-            # convert to millihertz
-            wgrav = self.wgrav / mHz
-            wmin = self.wmin / mHz
-            wmax = self.wmax / mHz
-            mineos.minos_bran(mode, self.eps, wgrav,
-                              self.lmin, self.lmax,
-                              wmin, wmax,
-                              self.nmin, self.nmax)
+        self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
+        
+        # convert to millihertz
+        wgrav = self.wgrav / mHz
+        wmin = self.wmin / mHz
+        wmax = self.wmax / mHz
 
+        # convert to kilometers
+        max_depth = self.max_depth / km
+        
+        mineos.minos_bran(mode, self.eps, wgrav,
+                          self.lmin, self.lmax,
+                          wmin, wmax,
+                          self.nmin, self.nmax)
+
         # eigcon
-        for mode in enabledModes:
-            self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
-            # convert to kilometers
-            max_depth = self.max_depth / km
-            mineos.eigcon(mode, max_depth)
+        self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
+        mineos.eigcon(mode, max_depth)
+        
+        return
 
+
+    def computeSeismograms(self, jobIndex):
+        os.chdir(archiveDirName)
+        self.redirectOutput(self.indexedName("output_green_syndat", jobIndex) + ".txt")
+        
+        stations = self.indexedStationsName(jobIndex)
+        mineos = Mineos(self.model, self.event, stations)
+        mineos.green_out = self.indexedName("green", jobIndex)
+        mineos.syndat_out = self.indexedName("syndat", jobIndex)
+
         # green
         self._info.log("running program 'green'")
         fmin = self.fmin / mHz
         fmax = self.fmax / mHz
-        mineos.green(fmin, fmax, self.nsamples)
+        db_list = [mode.id for mode in self.enabledModes()]
+        mineos.green(fmin, fmax, self.nsamples, db_list)
 
         # creat_origin
         self._info.log("running program 'creat_origin'")
@@ -365,10 +498,10 @@
         mineos.syndat(self.plane, self.datatype)
 
         # cucss2sac
-        os.symlink(mineos.green_out + ".origin", "seismograms.origin")
-        os.symlink(mineos.stations + ".site", "seismograms.site")
-        mineos.cucss2sac("seismograms", "output") # SAC
-        mineos.cucss2sac("-a", "seismograms", "output") # ASCII
+        os.symlink(mineos.green_out + ".origin", mineos.syndat_out + ".origin")
+        os.symlink(mineos.stations + ".site", mineos.syndat_out + ".site")
+        mineos.cucss2sac(mineos.syndat_out, "output") # SAC
+        mineos.cucss2sac("-a", mineos.syndat_out, "output") # ASCII
         
         return
 
@@ -384,21 +517,130 @@
         return
 
 
-def main(*args, **kwds):
-    if True:
+    #------------------------------------------------------------------------
+    # scheduling
+
+    def main(self, *args, **kwds):
+        self._info.activate()
+        context = self.context
+        if context == "login":
+            self.onLoginNode(*args, **kwds)
+        elif context.startswith("compute"):
+            self.onCompute(*args, **kwds)
+        elif context == "monitor":
+            self.onMonitor(*args, **kwds)
+        return
+
+
+    def onLoginNode(self, *args, **kwds):
+        self.redirectOutput("output_mineos.txt")
+
+        # This fork() allows the (gsi)ssh command to complete.
+        pid = os.fork()
+        if not pid:
+            os.close(0) # original 1 & 2 closed by dup2 above
+            pid = os.fork()
+            if pid:
+                self.monitorProgress(pid)
+            else:
+                self.runMineosPrograms(*args, **kwds)
+        return
+
+
+    def onMonitor(self, *args, **kwds):
+        if exists(archive):
+            sys.exit(0)
+        sys.exit(1)
+
+
+    def onCompute(self, *args, **kwds):
+
+        # This is analogous to MPI rank.
+        jobIndex = int(os.environ['LSB_JOBINDEX'])
+
+        context = self.context
+        if context == "computeEigen":
+            self.computeEigen(jobIndex)
+        elif context == "computeSeismograms":
+            self.computeSeismograms(jobIndex)
+
+        return
+
+
+    def redirectOutput(self, filename):
         # Redirect all output -- our journal output, and the output of
         # our children -- to a file.
-        fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
+        fd = os.open(filename, os.O_CREAT | os.O_WRONLY, mode)
         os.dup2(fd, 1)
         os.dup2(fd, 2)
         os.close(fd)
-    
-    mineos = MineosScript()
-    mineos.run(*args, **kwds)
+        return
 
 
+    def prepareJob(self, job, *args, **kwds):
+        job.nodes = 1
+        job.executable = sys.executable
+        job.baseArguments = [__file__] + self.getArgv(*args, **kwds)
+        return job
+
+
+    def trimJob(self, job):
+        maxSize = self.scheduler.maxSize
+        quotient = job.size / maxSize
+        remainder = job.size % maxSize
+        if job.size > maxSize:
+            job.size = maxSize
+        return quotient, remainder
+
+        
+    def scheduleJob(self, job):
+        self.scheduler.schedule(job)
+        return
+
+
+    def monitorProgress(self, pid):
+        progress = None
+        wpid, status = os.waitpid(pid, os.WNOHANG)
+        while wpid == 0:
+            newProgress = self.checkProgress()
+            if newProgress != progress:
+                progress = newProgress
+                #self.postProgress(progress)
+            sleep(60)
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+        return
+
+
+    def checkProgress(self):
+        # XXX
+        progress = 0.0
+        return progress
+
+
+    def postProgress(self, progress):
+        import urllib, urlparse
+        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+        fields = { 'progress': "%.2f" % progress }
+        body = urllib.urlencode(fields)
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        import httplib
+        if scheme == "http":
+            conn = httplib.HTTPConnection(host)
+        elif scheme == "https":
+            conn = httplib.HTTPSConnection(host)
+        else:
+            assert False # not scheme in ["http", "https"]
+        conn.request("POST", path, body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        return data
+
+
 if __name__ == "__main__":
-    main()
+    mineos = MineosScript()
+    mineos.run()
 
 
 # end of file



More information about the CIG-COMMITS mailing list