[cig-commits] r13865 - cs/portal/trunk/northridge/backend
leif at geodynamics.org
leif at geodynamics.org
Tue Jan 13 13:53:05 PST 2009
Author: leif
Date: 2009-01-13 13:53:04 -0800 (Tue, 13 Jan 2009)
New Revision: 13865
Added:
cs/portal/trunk/northridge/backend/mineos.cfg
Modified:
cs/portal/trunk/northridge/backend/mineos.py
Log:
Reworked the 'mineos.py' backend script so that it runs the Mineos
program suite in parallel using LSF's "job array" feature.
This code mostly works, but there is a numbering bug somewhere. In a
test run, 55/124 green+syndat jobs failed with "wfdisc relation
green-0102 appears to be empty, or does not exist" -- and indeed it
doesn't: job #102 outputs green-0099?!
Also, on CITerra, computation of the spheroidal portion of the mode
catalog takes forever to compute (24 hours was not enough). Here,
Ibrix is presumed guilty until proven otherwise.
Added: cs/portal/trunk/northridge/backend/mineos.cfg
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.cfg (rev 0)
+++ cs/portal/trunk/northridge/backend/mineos.cfg 2009-01-13 21:53:04 UTC (rev 13865)
@@ -0,0 +1,25 @@
+
+[mineos]
+# mineos.py uses custom LSFJobArrayScheduler
+#scheduler = lsf
+
+[mineos.lsf]
+max-size = 200
+slot-limit = 100
+
+[mineos.journal.info]
+lsf = true
+
+[mineos.eigen-job]
+queue = normal
+walltime = 24*hour
+stdout = eigen-%I-stdout.txt
+stderr = eigen-%I-stderr.txt
+
+[mineos.seismo-job]
+queue = normal
+walltime = 10*minute
+stdout = seismo-%I-stdout.txt
+stderr = seismo-%I-stderr.txt
+
+# end of file
Modified: cs/portal/trunk/northridge/backend/mineos.py
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py 2009-01-13 11:12:11 UTC (rev 13864)
+++ cs/portal/trunk/northridge/backend/mineos.py 2009-01-13 21:53:04 UTC (rev 13865)
@@ -2,11 +2,15 @@
from pyre.applications import Script
+from pyre.components import Component
+from pyre.schedulers import Job, SchedulerLSF
from pyre.units.SI import milli, hertz
from pyre.units.length import km
-import os, sys, stat, popen2, time
-from os.path import basename, dirname, splitext
+from os.path import exists, basename, dirname, join, splitext
+from time import sleep
+import os, sys, stat, popen2, tarfile, time
+
mHz = milli*hertz
# Assume this script is installed alongside the mineos binaries.
@@ -16,6 +20,10 @@
mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+archiveDirName = "mineos"
+archive = archiveDirName + ".tar.gz"
+
+
class Mode(object):
def __init__(self, jcom, id, desc):
self.jcom = jcom
@@ -34,14 +42,13 @@
class Mineos(object):
- def __init__(self, model, event, stations, progressUrl):
+ def __init__(self, model, event, stations):
self.bin = prefix + "/bin"
self.model = model
self.event = event
self.stations = stations
- self.progressUrl = progressUrl
- self.eigcon_out = []
- self.green_out = "green"
+ self.green_out = None
+ self.syndat_out = None
return
@@ -86,39 +93,24 @@
print >>s, dbname
s.close()
self.run("eigcon", stdin=eigcon_in)
- self.eigcon_out.append(dbname)
return
- def green(self, fmin, fmax, nsamples):
- nStations = self.lineCount(self.stations + ".site")
+ def green(self, fmin, fmax, nsamples, db_list):
s = open("db_list", "w")
- for dbname in self.eigcon_out:
+ for dbname in db_list:
print >>s, dbname
- green_in = open("green.in", "w")
- argv = [os.path.join(self.bin, "green")]
- child = popen2.Popen4(argv)
- for s in [green_in, child.tochild]:
- print >>s, self.stations
- print >>s, "db_list"
- print >>s, self.event
- print >>s, fmin, fmax
- print >>s, nsamples
- print >>s, self.green_out
- s.close()
- stationTally = 0
- self.postProgress(0.0)
- lastPostTime = time.time()
- for line in child.fromchild:
- print line,
- if line.startswith(" green: Station: "):
- stationTally += 1
- now = time.time()
- if now - lastPostTime >= 60.0:
- self.postProgress(float(stationTally) / float(nStations))
- lastPostTime = now
- status = child.wait()
- self.checkStatus(status, argv)
+ s.close()
+ green_in = "green.in"
+ s = open(green_in, "w")
+ print >>s, self.stations
+ print >>s, "db_list"
+ print >>s, self.event
+ print >>s, fmin, fmax
+ print >>s, nsamples
+ print >>s, self.green_out
+ s.close()
+ self.run("green", stdin=green_in)
return
@@ -132,7 +124,7 @@
print >>s, self.event
print >>s, plane # error in documentation!
print >>s, self.green_out
- print >>s, "seismograms" #"syndat_out"
+ print >>s, self.syndat_out
print >>s, datatype
s.close()
self.run("syndat", stdin=syndat_in)
@@ -174,7 +166,7 @@
if stderr is not None:
fd = os.open(stderr, os.O_WRONLY, mode)
os.dup2(fd, 2)
- executable = os.path.join(self.bin, argv[0])
+ executable = join(self.bin, argv[0])
try:
os.execvp(executable, argv)
except Exception, e:
@@ -198,36 +190,65 @@
return
- def lineCount(self, filename):
- tally = 0
- s = open(filename, 'r')
- for line in s:
- tally += 1
- return tally
+def lineCount(filename):
+ tally = 0
+ s = open(filename, 'r')
+ for line in s:
+ tally += 1
+ return tally
- def postProgress(self, progress):
- import urllib, urlparse
- scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
- fields = { 'progress': "%.2f" % progress }
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- import httplib
- if scheme == "http":
- conn = httplib.HTTPConnection(host)
- elif scheme == "https":
- conn = httplib.HTTPSConnection(host)
+class JobArray(Job):
+ import pyre.inventory as pyre
+ size = pyre.int("size")
+
+ def __init__(self):
+ super(JobArray, self).__init__()
+ self.baseArguments = []
+ self.extraArguments = []
+
+
+class LSFJobArrayScheduler(SchedulerLSF):
+
+ import pyre.inventory as pyre
+
+ maxSize = pyre.int("max-size") # MAX_JOB_ARRAY_SIZE or smaller
+ slotLimit = pyre.int("slot-limit")
+
+ # "Job array doesn't support -K option. Job not submitted."
+ wait = False
+
+ def schedule(self, jobArray):
+
+ assert(0 < jobArray.size <= self.maxSize)
+
+ jobArrayName = jobArray.task
+ if not jobArrayName:
+ jobArrayName = "jobname"
+ if self.slotLimit:
+ jobArray.task = '"%s[1-%d]%%%d"' % (jobArrayName, jobArray.size, self.slotLimit)
else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", path, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- return data
+ jobArray.task = '"%s[1-%d]"' % (jobArrayName, jobArray.size)
+ jobArray.arguments = jobArray.baseArguments + jobArray.extraArguments
+
+ super(LSFJobArrayScheduler, self).schedule(jobArray)
+ # Since we can't use "-K" with job arrays (!@#$%*?), schedule
+ # and wait for a dummy job which depends upon the array. XXX:
+ # This assumes the job name is unique. We could slurp the job
+ # ID from the 'bsub' output... but could that really be
+ # considered more robust?
+ argv = ['bsub', '-J', 'waiter', '-K', '-W', '0:01',
+ '-w', 'done(%s)' % jobArrayName,
+ '/bin/true']
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ return
+
class MineosScript(Script):
name = "mineos"
@@ -280,81 +301,193 @@
stations = pyre.str("stations")
+
#------------------------------------------------------------------------
# syndat
plane = pyre.int("plane", default=0) # use CMT tensor components as-is
datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
+
#------------------------------------------------------------------------
- # misc.
+ # scheduling
- progressUrl = pyre.str("progress-url")
+ scheduler = pyre.facility("scheduler", factory=LSFJobArrayScheduler)
+ eigenJob = pyre.facility("eigen-job", factory=JobArray)
+ seismoJob = pyre.facility("seismo-job", factory=JobArray)
+ context = pyre.str("context", default="login", validator=pyre.choice(["login", "computeEigen", "computeSeismograms", "monitor"]))
+ progressUrl = pyre.str("progress-url")
- def main(self, *args, **kwds):
-
- self._info.activate()
- enabledModes = []
- for mode in modes:
- if getattr(self, mode.id):
- enabledModes.append(mode)
+ #------------------------------------------------------------------------
+ # running
+ def runMineosPrograms(self, *args, **kwds):
+
+ enabledModes = self.enabledModes()
self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
if not enabledModes:
self._info.log("nothing to do")
return
- jobdir = os.getcwd()
-
- archiveDirName = "mineos"
os.mkdir(archiveDirName)
inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
for inputFile in inputFiles:
- os.rename(inputFile, os.path.join(archiveDirName, inputFile))
- os.chdir(archiveDirName)
+ os.rename(inputFile, join(archiveDirName, inputFile))
self._info.log("running Mineos program suite for model '%s'" % self.model)
- self.runMineos(self.model, enabledModes)
+ # Compute eigenfunctions and eigenfrequencies. For
+ # simplicity, always schedule one job for each mode,
+ # regardless of which modes are enabled.
+ job = self.prepareJob(self.eigenJob, *args, **kwds)
+ job.size = len(modes)
+ job.extraArguments = ["--context=computeEigen"]
+ quotient, remainder = self.trimJob(job)
+ assert quotient == 0
+ self.scheduleJob(job)
+
+ # compute seismograms
+ jobDir = os.getcwd()
+ os.chdir(archiveDirName)
+ nStations = lineCount(self.stations + ".site")
+ job = self.prepareJob(self.seismoJob, *args, **kwds)
+ job.size = nStations
+ job.extraArguments = ["--context=computeSeismograms"]
+ quotient, remainder = self.trimJob(job)
+ self.splinterStationList(job, quotient, remainder)
+ os.chdir(jobDir)
+ self.scheduleJob(job)
+
# archive the output
- os.chdir(jobdir)
- self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
+ self.spawn("/bin/tar", "cvzf", "_" + archive, archiveDirName)
# clean up
self.spawn("/bin/rm", "-rf", archiveDirName)
+ # The existence of the latter signals that we are done. See
+ # onMonitor().
+ os.rename("_" + archive, archive)
+
return
- def runMineos(self, model, enabledModes):
- mineos = Mineos(model, self.event, self.stations, self.progressUrl)
+ def enabledModes(self):
+ enabledModes = []
+ for mode in modes:
+ if getattr(self, mode.id):
+ enabledModes.append(mode)
+ return enabledModes
+
+ def splinterStationList(self, job, quotient, remainder):
+
+ stations = self.stations
+
+ # Read the entire contents of the master ".site" and
+ # ".sitechan" files.
+
+ s = open(stations + ".site", "r")
+ siteList = [site for site in s]
+ s.close()
+
+ sitechan = {}
+ s = open(stations + ".sitechan", "r")
+ for chan in s:
+ name = chan.split()[0]
+ cl = sitechan.setdefault(name, [])
+ cl.append(chan)
+ s.close()
+
+ generatedFiles = []
+
+ for jobIndex in xrange(1, job.size + 1):
+
+ # Pick out this job's share of the load.
+ n = quotient
+ if jobIndex < remainder + 1:
+ n += 1
+ l = siteList[0:n]
+ siteList = siteList[n:]
+
+ # Generate this job's ".site" file.
+ filename = self.indexedStationsName(jobIndex) + ".site"
+ generatedFiles.append(filename)
+ s = open(filename, "w")
+ for site in l:
+ s.write(site)
+ s.close()
+
+ # Generate this job's ".sitechan" file.
+ filename = self.indexedStationsName(jobIndex) + ".sitechan"
+ generatedFiles.append(filename)
+ s = open(filename, "w")
+ for site in l:
+ name = site.split()[0]
+ for chan in sitechan[name]:
+ s.write(chan)
+ s.close()
+
+ return generatedFiles
+
+
+ def indexedName(self, base, jobIndex):
+ return "%s-%04d" % (base, jobIndex)
+
+ def indexedStationsName(self, jobIndex):
+ return self.indexedName(self.stations, jobIndex)
+
+
+ def computeEigen(self, jobIndex):
+ os.chdir(archiveDirName)
+ self.redirectOutput(self.indexedName("output_eigen", jobIndex) + ".txt")
+
+ mineos = Mineos(self.model, self.event, self.stations)
+
+ mode = modes[jobIndex - 1]
+ if not getattr(self, mode.id):
+ self._info.log("mode '%s' is disabled" % mode.desc)
+ return
+
# minos_bran
- for mode in enabledModes:
- self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
- # convert to millihertz
- wgrav = self.wgrav / mHz
- wmin = self.wmin / mHz
- wmax = self.wmax / mHz
- mineos.minos_bran(mode, self.eps, wgrav,
- self.lmin, self.lmax,
- wmin, wmax,
- self.nmin, self.nmax)
+ self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
+
+ # convert to millihertz
+ wgrav = self.wgrav / mHz
+ wmin = self.wmin / mHz
+ wmax = self.wmax / mHz
+ # convert to kilometers
+ max_depth = self.max_depth / km
+
+ mineos.minos_bran(mode, self.eps, wgrav,
+ self.lmin, self.lmax,
+ wmin, wmax,
+ self.nmin, self.nmax)
+
# eigcon
- for mode in enabledModes:
- self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
- # convert to kilometers
- max_depth = self.max_depth / km
- mineos.eigcon(mode, max_depth)
+ self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
+ mineos.eigcon(mode, max_depth)
+
+ return
+
+ def computeSeismograms(self, jobIndex):
+ os.chdir(archiveDirName)
+ self.redirectOutput(self.indexedName("output_green_syndat", jobIndex) + ".txt")
+
+ stations = self.indexedStationsName(jobIndex)
+ mineos = Mineos(self.model, self.event, stations)
+ mineos.green_out = self.indexedName("green", jobIndex)
+ mineos.syndat_out = self.indexedName("syndat", jobIndex)
+
# green
self._info.log("running program 'green'")
fmin = self.fmin / mHz
fmax = self.fmax / mHz
- mineos.green(fmin, fmax, self.nsamples)
+ db_list = [mode.id for mode in self.enabledModes()]
+ mineos.green(fmin, fmax, self.nsamples, db_list)
# creat_origin
self._info.log("running program 'creat_origin'")
@@ -365,10 +498,10 @@
mineos.syndat(self.plane, self.datatype)
# cucss2sac
- os.symlink(mineos.green_out + ".origin", "seismograms.origin")
- os.symlink(mineos.stations + ".site", "seismograms.site")
- mineos.cucss2sac("seismograms", "output") # SAC
- mineos.cucss2sac("-a", "seismograms", "output") # ASCII
+ os.symlink(mineos.green_out + ".origin", mineos.syndat_out + ".origin")
+ os.symlink(mineos.stations + ".site", mineos.syndat_out + ".site")
+ mineos.cucss2sac(mineos.syndat_out, "output") # SAC
+ mineos.cucss2sac("-a", mineos.syndat_out, "output") # ASCII
return
@@ -384,21 +517,130 @@
return
-def main(*args, **kwds):
- if True:
+ #------------------------------------------------------------------------
+ # scheduling
+
+ def main(self, *args, **kwds):
+ self._info.activate()
+ context = self.context
+ if context == "login":
+ self.onLoginNode(*args, **kwds)
+ elif context.startswith("compute"):
+ self.onCompute(*args, **kwds)
+ elif context == "monitor":
+ self.onMonitor(*args, **kwds)
+ return
+
+
+ def onLoginNode(self, *args, **kwds):
+ self.redirectOutput("output_mineos.txt")
+
+ # This fork() allows the (gsi)ssh command to complete.
+ pid = os.fork()
+ if not pid:
+ os.close(0) # original 1 & 2 closed by dup2 above
+ pid = os.fork()
+ if pid:
+ self.monitorProgress(pid)
+ else:
+ self.runMineosPrograms(*args, **kwds)
+ return
+
+
+ def onMonitor(self, *args, **kwds):
+ if exists(archive):
+ sys.exit(0)
+ sys.exit(1)
+
+
+ def onCompute(self, *args, **kwds):
+
+ # This is analogous to MPI rank.
+ jobIndex = int(os.environ['LSB_JOBINDEX'])
+
+ context = self.context
+ if context == "computeEigen":
+ self.computeEigen(jobIndex)
+ elif context == "computeSeismograms":
+ self.computeSeismograms(jobIndex)
+
+ return
+
+
+ def redirectOutput(self, filename):
# Redirect all output -- our journal output, and the output of
# our children -- to a file.
- fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
+ fd = os.open(filename, os.O_CREAT | os.O_WRONLY, mode)
os.dup2(fd, 1)
os.dup2(fd, 2)
os.close(fd)
-
- mineos = MineosScript()
- mineos.run(*args, **kwds)
+ return
+ def prepareJob(self, job, *args, **kwds):
+ job.nodes = 1
+ job.executable = sys.executable
+ job.baseArguments = [__file__] + self.getArgv(*args, **kwds)
+ return job
+
+
+ def trimJob(self, job):
+ maxSize = self.scheduler.maxSize
+ quotient = job.size / maxSize
+ remainder = job.size % maxSize
+ if job.size > maxSize:
+ job.size = maxSize
+ return quotient, remainder
+
+
+ def scheduleJob(self, job):
+ self.scheduler.schedule(job)
+ return
+
+
+ def monitorProgress(self, pid):
+ progress = None
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ newProgress = self.checkProgress()
+ if newProgress != progress:
+ progress = newProgress
+ #self.postProgress(progress)
+ sleep(60)
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ return
+
+
+ def checkProgress(self):
+ # XXX
+ progress = 0.0
+ return progress
+
+
+ def postProgress(self, progress):
+ import urllib, urlparse
+ scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+ fields = { 'progress': "%.2f" % progress }
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ import httplib
+ if scheme == "http":
+ conn = httplib.HTTPConnection(host)
+ elif scheme == "https":
+ conn = httplib.HTTPSConnection(host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", path, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ return data
+
+
if __name__ == "__main__":
- main()
+ mineos = MineosScript()
+ mineos.run()
# end of file
More information about the CIG-COMMITS
mailing list