[cig-commits] commit: Clean up backend and add a README for it
Mercurial
hg at geodynamics.org
Sun Jul 3 20:04:14 PDT 2011
changeset: 4:6352cd054548
user: Walter Landry <wlandry at caltech.edu>
date: Fri Jun 17 13:09:19 2011 -0700
files: backend/README backend/mineos.cfg backend/mineos.py backend/specfem3D.py
description:
Clean up backend and add a README for it
diff -r fe71e8a64212 -r 6352cd054548 backend/README
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/backend/README Fri Jun 17 13:09:19 2011 -0700
@@ -0,0 +1,34 @@
+To install the code on Ranger, copy specfem_launcher.sh to
+
+ /scratch/projects/tg/CIG/
+
+Untar Specfem3D_Globe into the directory
+
+ /scratch/projects/tg/CIG/SPECFEM3D_GLOBE_preconfigured
+
+In that directory, run
+
+ mkdir DATABASES_MPI
+ ./configure CC=pgcc FC=mpif90 MPIFC=mpif90
+
+Put a copy of sge_script in that directory, and then tar the whole
+thing into
+
+ /scratch/projects/tg/CIG/SPECFEM3D_GLOBE_preconfigured.tgz
+
+------------------
+
+To run the daemon, copy
+
+ daemon.py
+ web-portal-daemon.cfg
+ start.sh
+
+somewhere (e.g. $HOME/daemon) and invoke start.sh. It will need
+stackless.
+
+If you need to restart the daemon, kill the "stackless" process and
+run start.sh again.
+
+Output is in journal.log and nohup.out.
+
diff -r fe71e8a64212 -r 6352cd054548 backend/mineos.cfg
--- a/backend/mineos.cfg Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,22 +0,0 @@
-
-[mineos]
-scheduler = lsf
-launch-command = pam -g 1 parametric_wrapper
-seismo-nodes = 40 ; use multiple of 4 on Lonestar
-
-[mineos.journal.info]
-lsf = true
-
-[mineos.eigen-job]
-queue = normal
-walltime = 4*hour
-stdout = eigen-stdout.txt
-stderr = eigen-stderr.txt
-
-[mineos.seismo-job]
-queue = normal
-walltime = 1*hour
-stdout = seismo-stdout.txt
-stderr = seismo-stderr.txt
-
-# end of file
diff -r fe71e8a64212 -r 6352cd054548 backend/mineos.py
--- a/backend/mineos.py Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,688 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.schedulers import Job, SchedulerLSF
-from pyre.units.SI import milli, hertz
-from pyre.units.length import km
-from os.path import exists, basename, dirname, join, splitext
-from time import sleep
-
-import os, sys, stat, popen2, tarfile, time
-
-mHz = milli*hertz
-
-# Assume this script is installed alongside the mineos binaries.
-prefix = dirname(dirname(__file__))
-
-
-mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
-
-
-archiveDirName = "mineos"
-archive = archiveDirName + ".tar.gz"
-eigenDone = "eigenDone"
-
-
-class Mode(object):
- def __init__(self, jcom, id, desc):
- self.jcom = jcom
- self.id = id
- self.desc = desc
-
-
-modes = [Mode(jcom, id, desc) for jcom, id, desc in [
- (1, 'radial', 'radial'),
- (2, 'toroidal', 'toroidal'),
- (3, 'spheroidal', 'spheroidal'),
- (4, 'ictoroidal', 'inner core toroidal'),
- ]]
-del jcom, id, desc
-
-
-class Mineos(object):
-
- def __init__(self, model, event, stations, jobIndex):
- self.bin = prefix + "/bin"
- self.model = model
- self.event = event
- self.stations = stations
- self.jobIndex = jobIndex
- self.green_out = "green-%04d" % jobIndex
- self.syndat_out = "syndat-%04d" % jobIndex
- return
-
-
- def minos_bran(self,
- mode,
- eps, wgrav,
- lmin, lmax,
- wmin, wmax,
- nmin, nmax):
- if mode.id == 'radial':
- # These are ignored.
- lmin = 0
- lmax = 0
- # Misha Barmin says that that there are no dispersion
- # branches for radial modes; thus nmin, nmax have a
- # different sense in this context. He says to fix them as
- # follows.
- nmin = 0
- nmax = 8000
- minos_bran_in = mode.id + "_mineos_bran.in"
- s = open(minos_bran_in, "w")
- print >>s, self.model
- print >>s, self.listing(mode)
- print >>s, self.eigenfunctions(mode)
- print >>s, eps, wgrav
- print >>s, mode.jcom
- print >>s, lmin, lmax, wmin, wmax, nmin, nmax
- s.close()
- self.run("minos_bran", stdin=minos_bran_in)
- return
-
-
- def eigcon(self, mode, max_depth):
- eigcon_in = mode.id + "_eigcon.in"
- s = open(eigcon_in, "w")
- print >>s, mode.jcom
- print >>s, self.model
- print >>s, max_depth
- print >>s, self.listing(mode)
- print >>s, self.eigenfunctions(mode)
- dbname = mode.id #+ "_eigcon_out"
- print >>s, dbname
- s.close()
- self.run("eigcon", stdin=eigcon_in)
- return
-
-
- def green(self, fmin, fmax, nsamples):
- green_in = "green_%04d.in" % self.jobIndex
- s = open(green_in, "w")
- print >>s, self.stations
- print >>s, "db_list"
- print >>s, self.event
- print >>s, fmin, fmax
- print >>s, nsamples
- print >>s, self.green_out
- s.close()
- self.run("green", stdin=green_in)
- return
-
-
- def creat_origin(self):
- self.run("creat_origin", [self.event, self.green_out])
-
-
- def syndat(self, plane, datatype):
- syndat_in = "syndat_%04d.in" % self.jobIndex
- s = open(syndat_in, "w")
- print >>s, self.event
- print >>s, plane # error in documentation!
- print >>s, self.green_out
- print >>s, self.syndat_out
- print >>s, datatype
- s.close()
- self.run("syndat", stdin=syndat_in)
- return
-
-
- def cucss2sac(self, *args):
- self.run("cucss2sac", list(args))
- return
-
-
- def listing(self, mode):
- return mode.id + "_listing"
-
-
- def eigenfunctions(self, mode):
- return mode.id + "_eigenfunctions"
-
-
- def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
- sys.stdout.flush()
- sys.stderr.flush()
- if args is None:
- args = []
- argv = [program] + args
- pid = os.fork()
- if pid:
- # parent
- _, status = os.wait()
- self.checkStatus(status, argv)
- else:
- # child
- if stdin is not None:
- fd = os.open(stdin, os.O_RDONLY)
- os.dup2(fd, 0)
- if stdout is not None:
- fd = os.open(stdout, os.O_WRONLY, mode)
- os.dup2(fd, 1)
- if stderr is not None:
- fd = os.open(stderr, os.O_WRONLY, mode)
- os.dup2(fd, 2)
- executable = join(self.bin, argv[0])
- try:
- os.execvp(executable, argv)
- except Exception, e:
- sys.exit("%s: %s" % (executable, e))
-
- return
-
-
- def checkStatus(self, status, argv):
- exitStatus = None
- if (os.WIFSIGNALED(status)):
- statusStr = "signal %d" % os.WTERMSIG(status)
- elif (os.WIFEXITED(status)):
- exitStatus = os.WEXITSTATUS(status)
- statusStr = "exit %d" % exitStatus
- else:
- statusStr = "status %d" % status
- statusMsg = "%s: %s" % (argv[0], statusStr)
- if exitStatus != 0:
- sys.exit(statusMsg)
- return
-
-
-def lineCount(filename):
- tally = 0
- s = open(filename, 'r')
- for line in s:
- tally += 1
- return tally
-
-
-class MineosScript(Script):
-
- name = "mineos"
-
- import pyre.inventory as pyre
-
- #------------------------------------------------------------------------
- # common
-
- model = pyre.str("model")
-
- # jcom
- radial = pyre.bool("radial")
- toroidal = pyre.bool("toroidal")
- spheroidal = pyre.bool("spheroidal")
- ictoroidal = pyre.bool("inner-core-toroidal")
-
- event = pyre.str("event")
-
-
- #------------------------------------------------------------------------
- # minos_bran
-
- eps = pyre.float("eps")
- wgrav = pyre.dimensional("wgrav", default=1*hertz)
-
- lmin = pyre.int("lmin")
- lmax = pyre.int("lmax")
-
- wmin = pyre.dimensional("wmin", default=1*hertz)
- wmax = pyre.dimensional("wmax", default=1*hertz)
-
- nmin = pyre.int("nmin")
- nmax = pyre.int("nmax")
-
-
- #------------------------------------------------------------------------
- # eigcon
-
- max_depth = pyre.dimensional("max-depth", default=1*km)
-
-
- #------------------------------------------------------------------------
- # green
-
- fmin = pyre.dimensional("fmin", default=1*hertz)
- fmax = pyre.dimensional("fmax", default=1*hertz)
-
- nsamples = pyre.int("nsamples")
-
- stations = pyre.str("stations")
-
-
- #------------------------------------------------------------------------
- # syndat
-
- plane = pyre.int("plane", default=0) # use CMT tensor components as-is
- datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
-
-
- #------------------------------------------------------------------------
- # scheduling
-
- import pyre.schedulers as schedulers
-
- nodes = pyre.int("nodes", default=1)
- scheduler = schedulers.scheduler("scheduler", default="none")
- eigenJob = schedulers.job("eigen-job")
- seismoJob = schedulers.job("seismo-job")
- seismoNodes = pyre.int("seismo-nodes")
- launchCommand = pyre.str("launch-command")
-
- context = pyre.str("context", default="login", validator=pyre.choice(["login",
- "loginEigen", "launchEigen", "computeEigen",
- "loginSeismograms", "launchSeismograms", "computeSeismograms",
- "monitorEigen", "monitorSeismograms"]))
- progressUrl = pyre.str("progress-url")
-
-
- #------------------------------------------------------------------------
- # running
-
- def loginEigen(self, *args, **kwds):
-
- enabledModes = self.enabledModes()
- self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
- if not enabledModes:
- self._info.log("nothing to do")
- return
-
- os.mkdir(archiveDirName)
- inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
- for inputFile in inputFiles:
- os.rename(inputFile, join(archiveDirName, inputFile))
-
- self._info.log("running Mineos program suite for model '%s'" % self.model)
-
- # Compute eigenfunctions and eigenfrequencies. For
- # simplicity, always schedule one job for each mode,
- # regardless of which modes are enabled. If mode catalogs
- # were cached, we would likewise include all modes in every
- # catalog, regardless of which modes are enabled for a
- # particular run.
- job = self.prepareJob(self.eigenJob, *args, **kwds)
- job.nodes = len(modes)
- job.arguments.extend(["--context=launchEigen", "--nodes=%d" % job.nodes])
- self.scheduleJob(job)
-
- # The existence this file signals that we are done. See
- # onMonitor().
- self.spawn("touch", eigenDone)
-
- return
-
-
- def loginSeismograms(self, *args, **kwds):
-
- enabledModes = self.enabledModes()
- self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
- if not enabledModes:
- self._info.log("nothing to do")
- return
-
- jobDir = os.getcwd()
- os.chdir(archiveDirName)
-
- # generate "db_list" input file
- db_list = [mode.id for mode in enabledModes]
- s = open("db_list", "w")
- for dbname in db_list:
- print >>s, dbname
- s.close()
-
- # compute seismograms
- nStations = lineCount(self.stations + ".site")
- job = self.prepareJob(self.seismoJob, *args, **kwds)
- job.nodes = nStations
- maxJobSize = self.seismoNodes
- quotient = job.nodes / maxJobSize
- remainder = job.nodes % maxJobSize
- if job.nodes > maxJobSize:
- job.nodes = maxJobSize
- job.arguments.extend(["--context=launchSeismograms", "--nodes=%d" % job.nodes])
- self.splinterStationList(job, quotient, remainder)
- os.chdir(jobDir)
- self.scheduleJob(job)
-
- # archive the output
- self.spawn("/bin/tar", "cvzf", "_" + archive, archiveDirName)
-
- # clean up
- self.spawn("/bin/rm", "-rf", archiveDirName)
-
- # The existence of the latter signals that we are done. See
- # onMonitor().
- os.rename("_" + archive, archive)
-
- return
-
-
- def enabledModes(self):
- enabledModes = []
- for mode in modes:
- if getattr(self, mode.id):
- enabledModes.append(mode)
- return enabledModes
-
-
- def splinterStationList(self, job, quotient, remainder):
-
- stations = self.stations
-
- # Read the entire contents of the master ".site" and
- # ".sitechan" files.
-
- s = open(stations + ".site", "r")
- siteList = [site for site in s]
- s.close()
-
- sitechan = {}
- s = open(stations + ".sitechan", "r")
- for chan in s:
- name = chan.split()[0]
- cl = sitechan.setdefault(name, [])
- cl.append(chan)
- s.close()
-
- generatedFiles = []
-
- for jobIndex in xrange(1, job.nodes + 1):
-
- # Pick out this job's share of the load.
- n = quotient
- if jobIndex < remainder + 1:
- n += 1
- l = siteList[0:n]
- siteList = siteList[n:]
-
- # Generate this job's ".site" file.
- filename = self.indexedStationsName(jobIndex) + ".site"
- generatedFiles.append(filename)
- s = open(filename, "w")
- for site in l:
- s.write(site)
- s.close()
-
- # Generate this job's ".sitechan" file.
- filename = self.indexedStationsName(jobIndex) + ".sitechan"
- generatedFiles.append(filename)
- s = open(filename, "w")
- for site in l:
- name = site.split()[0]
- for chan in sitechan[name]:
- s.write(chan)
- s.close()
-
- return generatedFiles
-
-
- def indexedName(self, base, jobIndex):
- return "%s-%04d" % (base, jobIndex)
-
- def indexedStationsName(self, jobIndex):
- return self.indexedName(self.stations, jobIndex)
-
-
- def computeEigen(self, jobIndex):
- os.chdir(archiveDirName)
- self.redirectOutput(self.indexedName("output_eigen", jobIndex) + ".txt")
-
- mineos = Mineos(self.model, self.event, self.stations, jobIndex)
-
- mode = modes[jobIndex - 1]
- if not getattr(self, mode.id):
- self._info.log("mode '%s' is disabled" % mode.desc)
- return
-
- # minos_bran
- self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
-
- # convert to millihertz
- wgrav = self.wgrav / mHz
- wmin = self.wmin / mHz
- wmax = self.wmax / mHz
-
- # convert to kilometers
- max_depth = self.max_depth / km
-
- mineos.minos_bran(mode, self.eps, wgrav,
- self.lmin, self.lmax,
- wmin, wmax,
- self.nmin, self.nmax)
-
- # eigcon
- self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
- mineos.eigcon(mode, max_depth)
-
- return
-
-
- def computeSeismograms(self, jobIndex):
- os.chdir(archiveDirName)
- self.redirectOutput(self.indexedName("output_green_syndat", jobIndex) + ".txt")
-
- stations = self.indexedStationsName(jobIndex)
- mineos = Mineos(self.model, self.event, stations, jobIndex)
-
- # green
- self._info.log("running program 'green'")
- fmin = self.fmin / mHz
- fmax = self.fmax / mHz
- mineos.green(fmin, fmax, self.nsamples)
-
- # creat_origin
- self._info.log("running program 'creat_origin'")
- mineos.creat_origin()
-
- # syndat
- self._info.log("running program 'syndat'")
- mineos.syndat(self.plane, self.datatype)
-
- # cucss2sac
- os.symlink(mineos.green_out + ".origin", mineos.syndat_out + ".origin")
- os.symlink(mineos.stations + ".site", mineos.syndat_out + ".site")
- mineos.cucss2sac(mineos.syndat_out, "output") # SAC
- mineos.cucss2sac("-a", mineos.syndat_out, "output") # ASCII
-
- return
-
-
- def spawn(self, *argv):
- command = ' '.join(argv)
- self._info.log("spawning: %s" % command)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- statusMsg = "%s: exit %d" % (argv[0], status)
- self._info.log(statusMsg)
- if status != 0:
- sys.exit("%s: %s" % (argv[0], statusMsg))
- return
-
-
- #------------------------------------------------------------------------
- # scheduling
-
- def main(self, *args, **kwds):
- self._info.activate()
- context = self.context
- if context.startswith("login"):
- self.onLoginNode(*args, **kwds)
- elif context.startswith("launch"):
- self.onLauncherNode(*args, **kwds)
- elif context.startswith("compute"):
- self.onCompute(*args, **kwds)
- elif context.startswith("monitor"):
- self.onMonitor(*args, **kwds)
- return
-
-
- def onLoginNode(self, *args, **kwds):
- self.redirectOutput("output_mineos.txt")
-
- # This fork() allows the (gsi)ssh command to complete.
- pid = os.fork()
- if not pid:
- os.close(0) # original 1 & 2 closed by dup2 above
- pid = os.fork()
- if pid:
- self.monitorProgress(pid)
- else:
- context = self.context
- if context == "login":
- self.loginEigen(*args, **kwds)
- self.loginSeismograms(*args, **kwds)
- elif context == "loginEigen":
- self.loginEigen(*args, **kwds)
- elif context == "loginSeismograms":
- self.loginSeismograms(*args, **kwds)
- return
-
-
- def onMonitor(self, *args, **kwds):
- filename = {"monitorEigen": eigenDone,
- "monitorSeismograms": archive}[self.context]
- if exists(filename):
- sys.exit(0)
- sys.exit(1)
-
-
- def onLauncherNode(self, *args, **kwds):
-
- launchCommand = self.launchCommand % {'nodes': self.nodes}
- launchCommand = launchCommand.split()
-
- myArgv = self.getArgv(*args, **kwds)
- context = self.context.replace("launch", "compute")
-
- argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=%s" % context, "--nodes=%d" % self.nodes]
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- return
-
-
- def onCompute(self, *args, **kwds):
-
- # Obtain the analogue to MPI rank.
- # See /opt/lsf/bin/parametric_wrapper on Lonestar.
- jobIndex = int(os.environ['TACC_LAUNCHER_TSK_ID']) + 1 # was LSB_JOBINDEX
-
- context = self.context
- if context == "computeEigen":
- self.computeEigen(jobIndex)
- elif context == "computeSeismograms":
- self.computeSeismograms(jobIndex)
-
- return
-
-
- def redirectOutput(self, filename):
- # Redirect all output -- our journal output, and the output of
- # our children -- to a file.
- fd = os.open(filename, os.O_CREAT | os.O_WRONLY, mode)
- os.dup2(fd, 1)
- os.dup2(fd, 2)
- os.close(fd)
- return
-
-
- def prepareJob(self, job, *args, **kwds):
- job.nodes = 1
- job.executable = sys.executable
- job.arguments = [__file__] + self.getArgv(*args, **kwds)
- return job
-
-
- def scheduleJob(self, job):
- self.scheduler.wait = True
- self.scheduler.schedule(job)
- return
-
-
- def monitorProgress(self, pid):
- context = self.context
- if context == "loginEigen":
- sleepTime = 600
- elif context == "loginSeismograms":
- sleepTime = 60
- self.nStations = None
- else:
- sleepTime = 60
- progress = None
- wpid, status = os.waitpid(pid, os.WNOHANG)
- while wpid == 0:
- newProgress = self.checkProgress()
- if newProgress != progress:
- progress = newProgress
- self.postProgress(progress)
- sleep(sleepTime)
- wpid, status = os.waitpid(pid, os.WNOHANG)
- return
-
-
- def checkProgress(self):
- context = self.context
- if context == "loginEigen":
- # The 'spheroidal' mode is the only one that takes a
- # significant amount of time to compute.
- if not self.spheroidal:
- return None
- try:
- s = open(join(archiveDirName, "spheroidal_listing"), 'r')
- except IOError:
- return None
- normin = max(self.nmin, 0)
- normax = max(self.nmax, normin)
- nord = 0
- for line in s:
- pieces = line.split()
- if len(pieces) < 2:
- continue
- ichar = pieces[1]
- if ichar != 's':
- continue
- nord = int(pieces[0])
- return float(nord - normin) / float(normax - normin)
- elif context == "loginSeismograms":
- # The duration of this computation can be tuned with the
- # "seismo-nodes" setting.
- try:
- s = open(join(archiveDirName, self.indexedName("output_green_syndat", 0) + ".txt"), 'r')
- if self.nStations is None:
- self.nStations = lineCount(join(archiveDirName, self.indexedStationsName(0) + ".site"))
- except IOError:
- return None
- stationTally = 0
- for line in s:
- if line.startswith(" green: Station: "):
- stationTally += 1
- return float(stationTally) / float(self.nStations)
- return None
-
-
- def postProgress(self, progress):
- import urllib, urlparse
- scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
- fields = { 'progress': "%.2f" % progress }
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- import httplib
- if scheme == "http":
- conn = httplib.HTTPConnection(host)
- elif scheme == "https":
- conn = httplib.HTTPSConnection(host)
- else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", path, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- return data
-
-
-if __name__ == "__main__":
- mineos = MineosScript()
- mineos.run()
-
-
-# end of file
diff -r fe71e8a64212 -r 6352cd054548 backend/specfem3D.py
--- a/backend/specfem3D.py Fri Jun 17 13:08:45 2011 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,339 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from os.path import dirname, exists, isdir, join
-import os, sys, stat, shutil, tarfile
-from time import sleep
-
-
-SPECFEM3D_GLOBE = dirname(__file__)
-archive = "specfem3dglobe.tar.gz"
-
-
-class Specfem3DGlobe(Script):
-
-
- name = "Specfem3DGlobe"
-
-
- import pyre.inventory
- import pyre.schedulers
- import pyre.launchers
-
- parFile = pyre.inventory.str("par-file")
- cmtSolution = pyre.inventory.str("cmt-solution")
- stations = pyre.inventory.str("stations")
- model = pyre.inventory.str("model")
- configureArgs = pyre.inventory.str("configure-args", default="FC=mpif90")
-
- scratchDir = pyre.inventory.str("scratch-dir", default="/scratch")
-
- nodes = pyre.inventory.int("nodes", default=1)
- scheduler = pyre.schedulers.scheduler("scheduler", default="none")
- job = pyre.schedulers.job("job")
- launchCommand = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
- context = pyre.inventory.str(
- name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute", "monitor"]))
- progressUrl = pyre.inventory.str("progress-url")
-
-
-
- def main(self, *args, **kwds):
- context = self.context
- if context == "login":
- self.onLoginNode(*args, **kwds)
- elif context == "launcher":
- self.onLauncherNode(*args, **kwds)
- elif context == "pre-compute":
- self.onPreCompute(*args, **kwds)
- elif context == "post-compute":
- self.onPostCompute(*args, **kwds)
- elif context == "monitor":
- self.onMonitor(*args, **kwds)
- return
-
-
- def onLoginNode(self, *args, **kwds):
- # Redirect all output -- our journal output, and the output of
- # our children -- to a file.
- fd = os.open("output_build.txt", os.O_CREAT | os.O_WRONLY, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
- os.dup2(fd, 1)
- os.dup2(fd, 2)
- os.close(fd)
-
- assert not exists(archive)
-
- self.prepareFiles()
- self.build()
-
- # This fork() allows the (gsi)ssh command to complete.
- pid = os.fork()
- if not pid:
- # This pair of child processes merely updates the
- # themometer on the web site. If the login node shuts
- # down, the worst that will happen is that the themometer
- # will freeze.
- os.close(0) # original 1 & 2 closed by dup2 above
- pid = os.fork()
- if pid:
- self.monitorProgress(pid)
- else:
- self.schedule(*args, **kwds)
- return
-
-
- def onMonitor(self, *args, **kwds):
- if exists(archive):
- sys.exit(0)
- sys.exit(1)
-
-
- def onLauncherNode(self, *args, **kwds):
-
- launchCommand = self.launchCommand % {'nodes': self.nodes}
- launchCommand = launchCommand.split()
-
- wd = os.getcwd()
- myArgv = self.getArgv(*args, **kwds)
-
- # launch ourselves
- argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- # launch the mesher
- argv = launchCommand + [join(wd, "xmeshfem3D")]
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- # launch the solver
- argv = launchCommand + [join(wd, "xspecfem3D")]
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- # launch ourselves
- argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- self.processOutputFiles()
-
- return
-
-
- def onPreCompute(self, *args, **kwds):
- makedirs(self.scratchDir)
-
- def onPostCompute(self, *args, **kwds):
- try:
- files = os.listdir(self.scratchDir)
- for file in files:
- try:
- os.remove(join(self.scratchDir, file))
- except OSError:
- pass
- except OSError:
- pass
-
- try:
- os.rmdir(self.scratchDir)
- except OSError:
- pass
-
- return
-
-
- def prepareFiles(self):
-
- self.mkdir("DATA")
- self.mkdir("OUTPUT_FILES")
-
- self.readAndCopyParFile()
-
- shutil.copyfile(self.cmtSolution, "DATA/CMTSOLUTION")
- shutil.copyfile(self.stations, "DATA/STATIONS")
-
- dataSrc = join(SPECFEM3D_GLOBE, "DATA")
- for name in os.listdir(dataSrc):
- src = join(dataSrc, name)
- dest = join("DATA", name)
- if exists(dest):
- continue
- os.symlink(src, dest)
-
- return
-
-
- def readAndCopyParFile(self):
- s = open(self.parFile, "r")
- d = open("DATA/Par_file", "w")
- for line in s:
- tokens = line.split()
- if len(tokens) >= 3:
- var = tokens[0]
- if var == "NCHUNKS":
- NCHUNKS = int(tokens[2])
- elif var == "NPROC_XI":
- NPROC_XI = int(tokens[2])
- elif var == "NPROC_ETA":
- NPROC_ETA = int(tokens[2])
- elif var == "LOCAL_PATH":
- print >>d, "LOCAL_PATH =", self.scratchDir
- continue
- print >>d, line,
- self.nodes = NCHUNKS * NPROC_XI * NPROC_ETA
- return
-
-
- def build(self):
-
- # configure
- configure = join(SPECFEM3D_GLOBE, "configure")
- configureArgs = self.configureArgs.split()
- argv = [configure] + configureArgs + ["MODEL=%s" % self.model]
- print ' '.join(argv)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- # make
- self.mkdir("obj")
- argv = ['make', 'xmeshfem3D', 'xspecfem3D']
- print ' '.join(argv)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- if status != 0:
- sys.exit("%s: exit %d" % (argv[0], status))
-
- return
-
-
- def schedule(self, *args, **kwds):
- argv = self.getArgv(*args, **kwds)
-
- # initialize the job
- job = self.job
- job.nodes = self.nodes
- job.executable = sys.executable
- job.arguments = [__file__] + argv + ["--context=launcher", "--nodes=%d" % self.nodes]
-
- if self.nodes > 512:
- job.queue = "hero"
-
- # schedule
- self.scheduler.schedule(job)
-
- return
-
-
- def mkdir(self, name):
- if not isdir(name):
- os.mkdir(name)
- return
-
-
- def processOutputFiles(self):
-
- archiveOut = open("_" + archive, 'w')
- tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
-
- for name in os.listdir("OUTPUT_FILES"):
- pathname = join("OUTPUT_FILES", name)
- arcname = pathname
- tgzOut.add(pathname, arcname)
-
- tgzOut.close()
- archiveOut.close()
-
- for filename in ["output_mesher.txt", "output_solver.txt"]:
- pathname = join("OUTPUT_FILES", filename)
- if exists(pathname):
- shutil.copyfile(pathname, filename)
-
- # The existence of the latter signals that we are done. See
- # onMonitor().
- os.rename("_" + archive, archive)
-
- return
-
-
- def monitorProgress(self, pid):
- progress = None
- wpid, status = os.waitpid(pid, os.WNOHANG)
- while wpid == 0:
- newProgress = self.checkProgress()
- if newProgress != progress:
- progress = newProgress
- self.postProgress(progress)
- sleep(60)
- wpid, status = os.waitpid(pid, os.WNOHANG)
- return
-
-
- def checkProgress(self):
- try:
- s = open("OUTPUT_FILES/output_solver.txt", 'r')
- except IOError:
- return None
- progress = 0.0
- for line in s:
- if line.startswith(" We have done"):
- progress = float(line.split()[3]) / 100.0
- return progress
-
-
- def postProgress(self, progress):
- import urllib, urlparse
- scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
- fields = { 'progress': "%.2f" % progress }
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- import httplib
- if scheme == "http":
- conn = httplib.HTTPConnection(host)
- elif scheme == "https":
- conn = httplib.HTTPSConnection(host)
- else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", path, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- return data
-
-
-
-def makedirs(name, mode=0777):
- """Like os.makedirs(), but multi-{thread,process} safe."""
- import os.path as path
- from os import curdir, mkdir
- head, tail = path.split(name)
- if not tail:
- head, tail = path.split(head)
- if head and tail and not path.exists(head):
- makedirs(head, mode)
- if tail == curdir: # xxx/newdir/. exists if xxx/newdir exists
- return
- try:
- mkdir(name, mode)
- except OSError, error:
- # 17 == "File exists"
- if hasattr(error, 'errno') and error.errno == 17:
- pass
- else:
- raise
- return
-
-
-
-if __name__ == "__main__":
- script = Specfem3DGlobe()
- script.run()
-
-
-# end of file
More information about the CIG-COMMITS
mailing list