[cig-commits] commit: Clean up backend and add a README for it

Mercurial hg at geodynamics.org
Sun Jul 3 20:04:14 PDT 2011


changeset:   4:6352cd054548
user:        Walter Landry <wlandry at caltech.edu>
date:        Fri Jun 17 13:09:19 2011 -0700
files:       backend/README backend/mineos.cfg backend/mineos.py backend/specfem3D.py
description:
Clean up backend and add a README for it


diff -r fe71e8a64212 -r 6352cd054548 backend/README
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/backend/README	Fri Jun 17 13:09:19 2011 -0700
@@ -0,0 +1,34 @@
+To install the code on Ranger, copy specfem_launcher.sh to
+
+  /scratch/projects/tg/CIG/
+
+Untar Specfem3D_Globe into the directory
+
+  /scratch/projects/tg/CIG/SPECFEM3D_GLOBE_preconfigured
+
+In that directory, run
+
+  mkdir DATABASES_MPI
+  ./configure CC=pgcc FC=mpif90 MPIFC=mpif90
+
+Put a copy of sge_script in that directory, and then tar the whole
+thing into
+
+  /scratch/projects/tg/CIG/SPECFEM3D_GLOBE_preconfigured.tgz
+
+------------------
+
+To run the daemon, copy
+
+  daemon.py
+  web-portal-daemon.cfg
+  start.sh
+
+somewhere (e.g. $HOME/daemon) and invoke start.sh.  It will need
+stackless.
+
+If you need to restart the daemon, kill the "stackless" process and
+run start.sh again.
+
+Output is in journal.log and nohup.out.
+
diff -r fe71e8a64212 -r 6352cd054548 backend/mineos.cfg
--- a/backend/mineos.cfg	Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,22 +0,0 @@
-
-[mineos]
-scheduler = lsf
-launch-command = pam -g 1 parametric_wrapper
-seismo-nodes = 40    ; use multiple of 4 on Lonestar
-
-[mineos.journal.info]
-lsf = true
-
-[mineos.eigen-job]
-queue = normal
-walltime = 4*hour
-stdout = eigen-stdout.txt
-stderr = eigen-stderr.txt
-
-[mineos.seismo-job]
-queue = normal
-walltime = 1*hour
-stdout = seismo-stdout.txt
-stderr = seismo-stderr.txt
-
-# end of file
diff -r fe71e8a64212 -r 6352cd054548 backend/mineos.py
--- a/backend/mineos.py	Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,688 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.schedulers import Job, SchedulerLSF
-from pyre.units.SI import milli, hertz
-from pyre.units.length import km
-from os.path import exists, basename, dirname, join, splitext
-from time import sleep
-
-import os, sys, stat, popen2, tarfile, time
-
-mHz = milli*hertz
-
-# Assume this script is installed alongside the mineos binaries.
-prefix = dirname(dirname(__file__))
-
-
-mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
-
-
-archiveDirName = "mineos"
-archive = archiveDirName + ".tar.gz"
-eigenDone = "eigenDone"
-
-
-class Mode(object):
-    def __init__(self, jcom, id, desc):
-        self.jcom = jcom
-        self.id = id
-        self.desc = desc
-
-
-modes = [Mode(jcom, id, desc) for jcom, id, desc in [
-    (1, 'radial', 'radial'),
-    (2, 'toroidal', 'toroidal'),
-    (3, 'spheroidal', 'spheroidal'),
-    (4, 'ictoroidal', 'inner core toroidal'),
-    ]]
-del jcom, id, desc
-
-
-class Mineos(object):
-
-    def __init__(self, model, event, stations, jobIndex):
-        self.bin = prefix + "/bin"
-        self.model = model
-        self.event = event
-        self.stations = stations
-        self.jobIndex = jobIndex
-        self.green_out = "green-%04d" % jobIndex
-        self.syndat_out = "syndat-%04d" % jobIndex
-        return
-
-
-    def minos_bran(self,
-                   mode,
-                   eps, wgrav,
-                   lmin, lmax,
-                   wmin, wmax,
-                   nmin, nmax):
-        if mode.id == 'radial':
-            # These are ignored.
-            lmin = 0
-            lmax = 0
-            # Misha Barmin says that that there are no dispersion
-            # branches for radial modes; thus nmin, nmax have a
-            # different sense in this context.  He says to fix them as
-            # follows.
-            nmin = 0
-            nmax = 8000
-        minos_bran_in = mode.id + "_mineos_bran.in"
-        s = open(minos_bran_in, "w")
-        print >>s, self.model
-        print >>s, self.listing(mode)
-        print >>s, self.eigenfunctions(mode)
-        print >>s, eps, wgrav
-        print >>s, mode.jcom
-        print >>s, lmin, lmax, wmin, wmax, nmin, nmax
-        s.close()
-        self.run("minos_bran", stdin=minos_bran_in)
-        return
-
-
-    def eigcon(self, mode, max_depth):
-        eigcon_in = mode.id + "_eigcon.in"
-        s = open(eigcon_in, "w")
-        print >>s, mode.jcom
-        print >>s, self.model
-        print >>s, max_depth
-        print >>s, self.listing(mode)
-        print >>s, self.eigenfunctions(mode)
-        dbname = mode.id #+ "_eigcon_out"
-        print >>s, dbname
-        s.close()
-        self.run("eigcon", stdin=eigcon_in)
-        return
-
-
-    def green(self, fmin, fmax, nsamples):
-        green_in = "green_%04d.in" % self.jobIndex
-        s = open(green_in, "w")
-        print >>s, self.stations
-        print >>s, "db_list"
-        print >>s, self.event
-        print >>s, fmin, fmax
-        print >>s, nsamples
-        print >>s, self.green_out
-        s.close()
-        self.run("green", stdin=green_in)
-        return
-
-
-    def creat_origin(self):
-        self.run("creat_origin", [self.event, self.green_out])
-
-
-    def syndat(self, plane, datatype):
-        syndat_in = "syndat_%04d.in" % self.jobIndex
-        s = open(syndat_in, "w")
-        print >>s, self.event
-        print >>s, plane # error in documentation!
-        print >>s, self.green_out
-        print >>s, self.syndat_out
-        print >>s, datatype
-        s.close()
-        self.run("syndat", stdin=syndat_in)
-        return
-
-
-    def cucss2sac(self, *args):
-        self.run("cucss2sac", list(args))
-        return
-
-
-    def listing(self, mode):
-        return mode.id + "_listing"
-
-
-    def eigenfunctions(self, mode):
-        return mode.id + "_eigenfunctions"
-
-
-    def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
-        sys.stdout.flush()
-        sys.stderr.flush()
-        if args is None:
-            args = []
-        argv = [program] + args
-        pid = os.fork()
-        if pid:
-            # parent
-            _, status = os.wait()
-            self.checkStatus(status, argv)
-        else:
-            # child
-            if stdin is not None:
-                fd = os.open(stdin, os.O_RDONLY)
-                os.dup2(fd, 0)
-            if stdout is not None:
-                fd = os.open(stdout, os.O_WRONLY, mode)
-                os.dup2(fd, 1)
-            if stderr is not None:
-                fd = os.open(stderr, os.O_WRONLY, mode)
-                os.dup2(fd, 2)
-            executable = join(self.bin, argv[0])
-            try:
-                os.execvp(executable, argv)
-            except Exception, e:
-                sys.exit("%s: %s" % (executable, e))
-            
-        return
-
-
-    def checkStatus(self, status, argv):
-        exitStatus = None
-        if (os.WIFSIGNALED(status)):
-            statusStr = "signal %d" % os.WTERMSIG(status)
-        elif (os.WIFEXITED(status)):
-            exitStatus = os.WEXITSTATUS(status)
-            statusStr = "exit %d" % exitStatus
-        else:
-            statusStr = "status %d" % status
-        statusMsg = "%s: %s" % (argv[0], statusStr)
-        if exitStatus != 0:
-            sys.exit(statusMsg)
-        return
-
-
-def lineCount(filename):
-    tally = 0
-    s = open(filename, 'r')
-    for line in s:
-        tally += 1
-    return tally
-
-
-class MineosScript(Script):
-
-    name = "mineos"
-
-    import pyre.inventory as pyre
-
-    #------------------------------------------------------------------------
-    # common
-    
-    model = pyre.str("model")
-
-    # jcom
-    radial = pyre.bool("radial")
-    toroidal = pyre.bool("toroidal")
-    spheroidal = pyre.bool("spheroidal")
-    ictoroidal = pyre.bool("inner-core-toroidal")
-
-    event = pyre.str("event")
-
-    
-    #------------------------------------------------------------------------
-    # minos_bran
-    
-    eps = pyre.float("eps")
-    wgrav = pyre.dimensional("wgrav", default=1*hertz)
-
-    lmin = pyre.int("lmin")
-    lmax = pyre.int("lmax")
-    
-    wmin = pyre.dimensional("wmin", default=1*hertz)
-    wmax = pyre.dimensional("wmax", default=1*hertz)
-    
-    nmin = pyre.int("nmin")
-    nmax = pyre.int("nmax")
-
-
-    #------------------------------------------------------------------------
-    # eigcon
-
-    max_depth = pyre.dimensional("max-depth", default=1*km)
-    
-
-    #------------------------------------------------------------------------
-    # green
-
-    fmin = pyre.dimensional("fmin", default=1*hertz)
-    fmax = pyre.dimensional("fmax", default=1*hertz)
-
-    nsamples = pyre.int("nsamples")
-
-    stations = pyre.str("stations")
-
-
-    #------------------------------------------------------------------------
-    # syndat
-    
-    plane = pyre.int("plane", default=0) # use CMT tensor components as-is
-    datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
-
-
-    #------------------------------------------------------------------------
-    # scheduling
-
-    import pyre.schedulers as schedulers
-    
-    nodes          = pyre.int("nodes", default=1)
-    scheduler      = schedulers.scheduler("scheduler", default="none")
-    eigenJob       = schedulers.job("eigen-job")
-    seismoJob      = schedulers.job("seismo-job")
-    seismoNodes    = pyre.int("seismo-nodes")
-    launchCommand  = pyre.str("launch-command")
-
-    context        = pyre.str("context", default="login", validator=pyre.choice(["login",
-                                                                                 "loginEigen", "launchEigen", "computeEigen",
-                                                                                 "loginSeismograms", "launchSeismograms", "computeSeismograms",
-                                                                                 "monitorEigen", "monitorSeismograms"]))
-    progressUrl    = pyre.str("progress-url")
-
-
-    #------------------------------------------------------------------------
-    # running
-
-    def loginEigen(self, *args, **kwds):
-
-        enabledModes = self.enabledModes()
-        self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
-        if not enabledModes:
-            self._info.log("nothing to do")
-            return
-
-        os.mkdir(archiveDirName)
-        inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
-        for inputFile in inputFiles:
-            os.rename(inputFile, join(archiveDirName, inputFile))
-
-        self._info.log("running Mineos program suite for model '%s'" % self.model)
-
-        # Compute eigenfunctions and eigenfrequencies.  For
-        # simplicity, always schedule one job for each mode,
-        # regardless of which modes are enabled.  If mode catalogs
-        # were cached, we would likewise include all modes in every
-        # catalog, regardless of which modes are enabled for a
-        # particular run.
-        job = self.prepareJob(self.eigenJob, *args, **kwds)
-        job.nodes = len(modes)
-        job.arguments.extend(["--context=launchEigen", "--nodes=%d" % job.nodes])
-        self.scheduleJob(job)
-
-        # The existence this file signals that we are done.  See
-        # onMonitor().
-        self.spawn("touch", eigenDone)
-
-        return
-
-
-    def loginSeismograms(self, *args, **kwds):
-        
-        enabledModes = self.enabledModes()
-        self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
-        if not enabledModes:
-            self._info.log("nothing to do")
-            return
-        
-        jobDir = os.getcwd()
-        os.chdir(archiveDirName)
-        
-        # generate "db_list" input file
-        db_list = [mode.id for mode in enabledModes]
-        s = open("db_list", "w")
-        for dbname in db_list:
-            print >>s, dbname
-        s.close()
-        
-        # compute seismograms
-        nStations = lineCount(self.stations + ".site")
-        job = self.prepareJob(self.seismoJob, *args, **kwds)
-        job.nodes = nStations
-        maxJobSize = self.seismoNodes
-        quotient = job.nodes / maxJobSize
-        remainder = job.nodes % maxJobSize
-        if job.nodes > maxJobSize:
-            job.nodes = maxJobSize
-        job.arguments.extend(["--context=launchSeismograms", "--nodes=%d" % job.nodes])
-        self.splinterStationList(job, quotient, remainder)
-        os.chdir(jobDir)
-        self.scheduleJob(job)
-
-        # archive the output
-        self.spawn("/bin/tar", "cvzf", "_" + archive, archiveDirName)
-
-        # clean up
-        self.spawn("/bin/rm", "-rf", archiveDirName)
-
-        # The existence of the latter signals that we are done.  See
-        # onMonitor().
-        os.rename("_" + archive, archive)
-
-        return
-
-
-    def enabledModes(self):
-        enabledModes = []
-        for mode in modes:
-            if getattr(self, mode.id):
-                enabledModes.append(mode)
-        return enabledModes
-
-
-    def splinterStationList(self, job, quotient, remainder):
-
-        stations = self.stations
-
-        # Read the entire contents of the master ".site" and
-        # ".sitechan" files.
-
-        s = open(stations + ".site", "r")
-        siteList = [site for site in s]
-        s.close()
-
-        sitechan = {}
-        s = open(stations + ".sitechan", "r")
-        for chan in s:
-            name = chan.split()[0]
-            cl = sitechan.setdefault(name, [])
-            cl.append(chan)
-        s.close()
-
-        generatedFiles = []
-
-        for jobIndex in xrange(1, job.nodes + 1):
-            
-            # Pick out this job's share of the load.
-            n = quotient
-            if jobIndex < remainder + 1:
-                n += 1
-            l = siteList[0:n]
-            siteList = siteList[n:]
-
-            # Generate this job's ".site" file.
-            filename = self.indexedStationsName(jobIndex) + ".site"
-            generatedFiles.append(filename)
-            s = open(filename, "w")
-            for site in l:
-                s.write(site)
-            s.close()
-            
-            # Generate this job's ".sitechan" file.
-            filename = self.indexedStationsName(jobIndex) + ".sitechan"
-            generatedFiles.append(filename)
-            s = open(filename, "w")
-            for site in l:
-                name = site.split()[0]
-                for chan in sitechan[name]:
-                    s.write(chan)
-            s.close()
-        
-        return generatedFiles
-
-
-    def indexedName(self, base, jobIndex):
-        return "%s-%04d" % (base, jobIndex)
-
-    def indexedStationsName(self, jobIndex):
-        return self.indexedName(self.stations, jobIndex)
-
-
-    def computeEigen(self, jobIndex):
-        os.chdir(archiveDirName)
-        self.redirectOutput(self.indexedName("output_eigen", jobIndex) + ".txt")
-
-        mineos = Mineos(self.model, self.event, self.stations, jobIndex)
-
-        mode = modes[jobIndex - 1]
-        if not getattr(self, mode.id):
-            self._info.log("mode '%s' is disabled" % mode.desc)
-            return
-        
-        # minos_bran
-        self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
-        
-        # convert to millihertz
-        wgrav = self.wgrav / mHz
-        wmin = self.wmin / mHz
-        wmax = self.wmax / mHz
-
-        # convert to kilometers
-        max_depth = self.max_depth / km
-        
-        mineos.minos_bran(mode, self.eps, wgrav,
-                          self.lmin, self.lmax,
-                          wmin, wmax,
-                          self.nmin, self.nmax)
-
-        # eigcon
-        self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
-        mineos.eigcon(mode, max_depth)
-        
-        return
-
-
-    def computeSeismograms(self, jobIndex):
-        os.chdir(archiveDirName)
-        self.redirectOutput(self.indexedName("output_green_syndat", jobIndex) + ".txt")
-        
-        stations = self.indexedStationsName(jobIndex)
-        mineos = Mineos(self.model, self.event, stations, jobIndex)
-
-        # green
-        self._info.log("running program 'green'")
-        fmin = self.fmin / mHz
-        fmax = self.fmax / mHz
-        mineos.green(fmin, fmax, self.nsamples)
-
-        # creat_origin
-        self._info.log("running program 'creat_origin'")
-        mineos.creat_origin()
-
-        # syndat
-        self._info.log("running program 'syndat'")
-        mineos.syndat(self.plane, self.datatype)
-
-        # cucss2sac
-        os.symlink(mineos.green_out + ".origin", mineos.syndat_out + ".origin")
-        os.symlink(mineos.stations + ".site", mineos.syndat_out + ".site")
-        mineos.cucss2sac(mineos.syndat_out, "output") # SAC
-        mineos.cucss2sac("-a", mineos.syndat_out, "output") # ASCII
-        
-        return
-
-
-    def spawn(self, *argv):
-        command = ' '.join(argv)
-        self._info.log("spawning: %s" % command)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        statusMsg = "%s: exit %d" % (argv[0], status)
-        self._info.log(statusMsg)
-        if status != 0:
-            sys.exit("%s: %s" % (argv[0], statusMsg))
-        return
-
-
-    #------------------------------------------------------------------------
-    # scheduling
-
-    def main(self, *args, **kwds):
-        self._info.activate()
-        context = self.context
-        if context.startswith("login"):
-            self.onLoginNode(*args, **kwds)
-        elif context.startswith("launch"):
-            self.onLauncherNode(*args, **kwds)
-        elif context.startswith("compute"):
-            self.onCompute(*args, **kwds)
-        elif context.startswith("monitor"):
-            self.onMonitor(*args, **kwds)
-        return
-
-
-    def onLoginNode(self, *args, **kwds):
-        self.redirectOutput("output_mineos.txt")
-
-        # This fork() allows the (gsi)ssh command to complete.
-        pid = os.fork()
-        if not pid:
-            os.close(0) # original 1 & 2 closed by dup2 above
-            pid = os.fork()
-            if pid:
-                self.monitorProgress(pid)
-            else:
-                context = self.context
-                if context == "login":
-                    self.loginEigen(*args, **kwds)
-                    self.loginSeismograms(*args, **kwds)
-                elif context == "loginEigen":
-                    self.loginEigen(*args, **kwds)
-                elif context == "loginSeismograms":
-                    self.loginSeismograms(*args, **kwds)
-        return
-
-
-    def onMonitor(self, *args, **kwds):
-        filename = {"monitorEigen": eigenDone,
-                    "monitorSeismograms": archive}[self.context]
-        if exists(filename):
-            sys.exit(0)
-        sys.exit(1)
-
-
-    def onLauncherNode(self, *args, **kwds):
-        
-        launchCommand = self.launchCommand % {'nodes': self.nodes}
-        launchCommand = launchCommand.split()
-
-        myArgv = self.getArgv(*args, **kwds)
-        context = self.context.replace("launch", "compute")
-
-        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=%s" % context, "--nodes=%d" % self.nodes]
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        return
-
-
-    def onCompute(self, *args, **kwds):
-
-        # Obtain the analogue to MPI rank.
-        # See /opt/lsf/bin/parametric_wrapper on Lonestar.
-        jobIndex = int(os.environ['TACC_LAUNCHER_TSK_ID']) + 1 # was LSB_JOBINDEX
-
-        context = self.context
-        if context == "computeEigen":
-            self.computeEigen(jobIndex)
-        elif context == "computeSeismograms":
-            self.computeSeismograms(jobIndex)
-
-        return
-
-
-    def redirectOutput(self, filename):
-        # Redirect all output -- our journal output, and the output of
-        # our children -- to a file.
-        fd = os.open(filename, os.O_CREAT | os.O_WRONLY, mode)
-        os.dup2(fd, 1)
-        os.dup2(fd, 2)
-        os.close(fd)
-        return
-
-
-    def prepareJob(self, job, *args, **kwds):
-        job.nodes = 1
-        job.executable = sys.executable
-        job.arguments = [__file__] + self.getArgv(*args, **kwds)
-        return job
-
-
-    def scheduleJob(self, job):
-        self.scheduler.wait = True
-        self.scheduler.schedule(job)
-        return
-
-
-    def monitorProgress(self, pid):
-        context = self.context
-        if context == "loginEigen":
-            sleepTime = 600
-        elif context == "loginSeismograms":
-            sleepTime = 60
-            self.nStations = None
-        else:
-            sleepTime = 60
-        progress = None
-        wpid, status = os.waitpid(pid, os.WNOHANG)
-        while wpid == 0:
-            newProgress = self.checkProgress()
-            if newProgress != progress:
-                progress = newProgress
-                self.postProgress(progress)
-            sleep(sleepTime)
-            wpid, status = os.waitpid(pid, os.WNOHANG)
-        return
-
-
-    def checkProgress(self):
-        context = self.context
-        if context == "loginEigen":
-            # The 'spheroidal' mode is the only one that takes a
-            # significant amount of time to compute.
-            if not self.spheroidal:
-                return None
-            try:
-                s = open(join(archiveDirName, "spheroidal_listing"), 'r')
-            except IOError:
-                return None
-            normin = max(self.nmin, 0)
-            normax = max(self.nmax, normin)
-            nord = 0
-            for line in s:
-                pieces = line.split()
-                if len(pieces) < 2:
-                    continue
-                ichar = pieces[1]
-                if ichar != 's':
-                    continue
-                nord = int(pieces[0])
-            return float(nord - normin) / float(normax - normin)
-        elif context == "loginSeismograms":
-            # The duration of this computation can be tuned with the
-            # "seismo-nodes" setting.
-            try:
-                s = open(join(archiveDirName, self.indexedName("output_green_syndat", 0) + ".txt"), 'r')
-                if self.nStations is None:
-                    self.nStations = lineCount(join(archiveDirName, self.indexedStationsName(0) + ".site"))
-            except IOError:
-                return None
-            stationTally = 0
-            for line in s:
-                if line.startswith(" green: Station: "):
-                    stationTally += 1
-            return float(stationTally) / float(self.nStations)
-        return None
-
-
-    def postProgress(self, progress):
-        import urllib, urlparse
-        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
-        fields = { 'progress': "%.2f" % progress }
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        import httplib
-        if scheme == "http":
-            conn = httplib.HTTPConnection(host)
-        elif scheme == "https":
-            conn = httplib.HTTPSConnection(host)
-        else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", path, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        return data
-
-
-if __name__ == "__main__":
-    mineos = MineosScript()
-    mineos.run()
-
-
-# end of file
diff -r fe71e8a64212 -r 6352cd054548 backend/specfem3D.py
--- a/backend/specfem3D.py	Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,339 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from os.path import dirname, exists, isdir, join
-import os, sys, stat, shutil, tarfile
-from time import sleep
-
-
-SPECFEM3D_GLOBE = dirname(__file__)
-archive = "specfem3dglobe.tar.gz"
-
-
-class Specfem3DGlobe(Script):
-
-
-    name = "Specfem3DGlobe"
-
-
-    import pyre.inventory
-    import pyre.schedulers
-    import pyre.launchers
-    
-    parFile        = pyre.inventory.str("par-file")
-    cmtSolution    = pyre.inventory.str("cmt-solution")
-    stations       = pyre.inventory.str("stations")
-    model          = pyre.inventory.str("model")
-    configureArgs  = pyre.inventory.str("configure-args", default="FC=mpif90")
-
-    scratchDir     = pyre.inventory.str("scratch-dir", default="/scratch")
-
-    nodes          = pyre.inventory.int("nodes", default=1)
-    scheduler      = pyre.schedulers.scheduler("scheduler", default="none")
-    job            = pyre.schedulers.job("job")
-    launchCommand  = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
-    context        = pyre.inventory.str(
-        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute", "monitor"]))
-    progressUrl = pyre.inventory.str("progress-url")
-
-
-
-    def main(self, *args, **kwds):
-        context = self.context
-        if context == "login":
-            self.onLoginNode(*args, **kwds)
-        elif context == "launcher":
-            self.onLauncherNode(*args, **kwds)
-        elif context == "pre-compute":
-            self.onPreCompute(*args, **kwds)
-        elif context == "post-compute":
-            self.onPostCompute(*args, **kwds)
-        elif context == "monitor":
-            self.onMonitor(*args, **kwds)
-        return
-
-
-    def onLoginNode(self, *args, **kwds):
-        # Redirect all output -- our journal output, and the output of
-        # our children -- to a file.
-        fd = os.open("output_build.txt", os.O_CREAT | os.O_WRONLY, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
-        os.dup2(fd, 1)
-        os.dup2(fd, 2)
-        os.close(fd)
-
-        assert not exists(archive)
-
-        self.prepareFiles()
-        self.build()
-
-        # This fork() allows the (gsi)ssh command to complete.
-        pid = os.fork()
-        if not pid:
-            # This pair of child processes merely updates the
-            # themometer on the web site.  If the login node shuts
-            # down, the worst that will happen is that the themometer
-            # will freeze.
-            os.close(0) # original 1 & 2 closed by dup2 above
-            pid = os.fork()
-            if pid:
-                self.monitorProgress(pid)
-            else:
-                self.schedule(*args, **kwds)
-        return
-
-
-    def onMonitor(self, *args, **kwds):
-        if exists(archive):
-            sys.exit(0)
-        sys.exit(1)
-
-
-    def onLauncherNode(self, *args, **kwds):
-        
-        launchCommand = self.launchCommand % {'nodes': self.nodes}
-        launchCommand = launchCommand.split()
-
-        wd = os.getcwd()
-        myArgv = self.getArgv(*args, **kwds)
-
-        # launch ourselves
-        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        # launch the mesher
-        argv = launchCommand + [join(wd, "xmeshfem3D")]
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        # launch the solver
-        argv = launchCommand + [join(wd, "xspecfem3D")]
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        # launch ourselves
-        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        self.processOutputFiles()
-
-        return
-
-
-    def onPreCompute(self, *args, **kwds):
-        makedirs(self.scratchDir)
-
-    def onPostCompute(self, *args, **kwds):
-        try:
-            files = os.listdir(self.scratchDir)
-            for file in files:
-                try:
-                    os.remove(join(self.scratchDir, file))
-                except OSError:
-                    pass
-        except OSError:
-            pass
-
-        try:
-            os.rmdir(self.scratchDir)
-        except OSError:
-            pass
-
-        return
-
-
-    def prepareFiles(self):
-        
-        self.mkdir("DATA")
-        self.mkdir("OUTPUT_FILES")
-        
-        self.readAndCopyParFile()
-
-        shutil.copyfile(self.cmtSolution, "DATA/CMTSOLUTION")
-        shutil.copyfile(self.stations, "DATA/STATIONS")
-
-        dataSrc = join(SPECFEM3D_GLOBE, "DATA")
-        for name in os.listdir(dataSrc):
-            src = join(dataSrc, name)
-            dest = join("DATA", name)
-            if exists(dest):
-                continue
-            os.symlink(src, dest)
-
-        return
-
-
-    def readAndCopyParFile(self):
-        s = open(self.parFile, "r")
-        d = open("DATA/Par_file", "w")
-        for line in s:
-            tokens = line.split()
-            if len(tokens) >= 3:
-                var = tokens[0]
-                if var == "NCHUNKS":
-                    NCHUNKS = int(tokens[2])
-                elif var == "NPROC_XI":
-                    NPROC_XI = int(tokens[2])
-                elif var == "NPROC_ETA":
-                    NPROC_ETA = int(tokens[2])
-                elif var == "LOCAL_PATH":
-                    print >>d, "LOCAL_PATH =", self.scratchDir
-                    continue
-            print >>d, line,
-        self.nodes = NCHUNKS * NPROC_XI * NPROC_ETA
-        return
-
-
-    def build(self):
-        
-        # configure
-        configure = join(SPECFEM3D_GLOBE, "configure")
-        configureArgs = self.configureArgs.split()
-        argv = [configure] + configureArgs + ["MODEL=%s" % self.model]
-        print ' '.join(argv)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        # make
-        self.mkdir("obj")
-        argv = ['make', 'xmeshfem3D', 'xspecfem3D']
-        print ' '.join(argv)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        if status != 0:
-            sys.exit("%s: exit %d" % (argv[0], status))
-
-        return
-
-
-    def schedule(self, *args, **kwds):
-        argv = self.getArgv(*args, **kwds)
-        
-        # initialize the job
-        job = self.job
-        job.nodes = self.nodes
-        job.executable = sys.executable
-        job.arguments = [__file__] + argv + ["--context=launcher", "--nodes=%d" % self.nodes]
-
-        if self.nodes > 512:
-            job.queue = "hero"
-
-        # schedule
-        self.scheduler.schedule(job)
-
-        return
-
-
-    def mkdir(self, name):
-        if not isdir(name):
-            os.mkdir(name)
-        return
-
-
-    def processOutputFiles(self):
-
-        archiveOut = open("_" + archive, 'w')
-        tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
-
-        for name in os.listdir("OUTPUT_FILES"):
-            pathname = join("OUTPUT_FILES", name)
-            arcname = pathname
-            tgzOut.add(pathname, arcname)
-
-        tgzOut.close()
-        archiveOut.close()
-
-        for filename in ["output_mesher.txt", "output_solver.txt"]:
-            pathname = join("OUTPUT_FILES", filename)
-            if exists(pathname):
-                shutil.copyfile(pathname, filename)
-
-        # The existence of the latter signals that we are done.  See
-        # onMonitor().
-        os.rename("_" + archive, archive)
-
-        return
-
-
-    def monitorProgress(self, pid):
-        progress = None
-        wpid, status = os.waitpid(pid, os.WNOHANG)
-        while wpid == 0:
-            newProgress = self.checkProgress()
-            if newProgress != progress:
-                progress = newProgress
-                self.postProgress(progress)
-            sleep(60)
-            wpid, status = os.waitpid(pid, os.WNOHANG)
-        return
-
-
-    def checkProgress(self):
-        try:
-            s = open("OUTPUT_FILES/output_solver.txt", 'r')
-        except IOError:
-            return None
-        progress = 0.0
-        for line in s:
-            if line.startswith(" We have done"):
-                progress = float(line.split()[3]) / 100.0
-        return progress
-
-
-    def postProgress(self, progress):
-        import urllib, urlparse
-        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
-        fields = { 'progress': "%.2f" % progress }
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        import httplib
-        if scheme == "http":
-            conn = httplib.HTTPConnection(host)
-        elif scheme == "https":
-            conn = httplib.HTTPSConnection(host)
-        else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", path, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        return data
-
-
-
-def makedirs(name, mode=0777):
-    """Like os.makedirs(), but multi-{thread,process} safe."""
-    import os.path as path
-    from os import curdir, mkdir
-    head, tail = path.split(name)
-    if not tail:
-        head, tail = path.split(head)
-    if head and tail and not path.exists(head):
-        makedirs(head, mode)
-        if tail == curdir:           # xxx/newdir/. exists if xxx/newdir exists
-            return
-    try:
-        mkdir(name, mode)
-    except OSError, error:
-        # 17 == "File exists"
-        if hasattr(error, 'errno') and error.errno == 17:
-            pass
-        else:
-            raise
-    return
-
-
-
-if __name__ == "__main__":
-    script = Specfem3DGlobe()
-    script.run()
-
-
-# end of file



More information about the CIG-COMMITS mailing list