[cig-commits] r12366 - in cs/portal/trunk: . northridge northridge/backend
leif at geodynamics.org
leif at geodynamics.org
Tue Jul 1 17:46:56 PDT 2008
Author: leif
Date: 2008-07-01 17:46:55 -0700 (Tue, 01 Jul 2008)
New Revision: 12366
Added:
cs/portal/trunk/northridge/backend/
cs/portal/trunk/northridge/backend/daemon.py
cs/portal/trunk/northridge/backend/mineos.py
cs/portal/trunk/northridge/backend/specfem3D.py
cs/portal/trunk/northridge/backend/web-portal-daemon.cfg
Removed:
cs/portal/trunk/daemon.py
cs/portal/trunk/mineos.cfg
cs/portal/trunk/mineos.py
cs/portal/trunk/web-portal-daemon.cfg
Modified:
cs/portal/trunk/northridge/MANIFEST.in
Log:
United the files which compose the portal's backend. Updated the
example .cfg file for the daemon.
Deleted: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/daemon.py 2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,835 +0,0 @@
-
-import stackless
-import os, sys, signal
-from popen2 import Popen4
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.units.time import second
-
-
-# NYI: RSL AST
-class RSLUnquoted(object):
- def __init__(self, value):
- self.value = value
-
-TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
-TG_COMMUNITY = "/projects/tg"
-
-# I'm not sure what to do about this yet.
-TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
-
-
-
-class GassServer(object):
-
- # @classmethod
- def startUp(cls, directory, info):
- argv = ["globus-gass-server", "-r", "-w", "-c"]
- savedWd = os.getcwd()
- os.chdir(directory)
- child = Popen4(argv)
- os.chdir(savedWd)
- child.tochild.close()
- while True:
- url = child.fromchild.readline().strip()
- if url.startswith("https:"):
- break
- child.fromchild.close()
- return GassServer(child, url, directory, info)
- startUp = classmethod(startUp)
-
- def __init__(self, child, url, directory, info):
- self.child = child
- self.url = url
- self.directory = directory
- self.info = info
-
- def shutDown(self):
- # This doesn't work reliably...
- argv = ["globus-gass-server-shutdown", self.url]
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- statusMsg = "%s: exit %d" % (argv[0], status)
- self.info.log(statusMsg)
- if status != 0:
- # ...so sometimes we kill it!
- self.info.log("kill -INT %d" % self.child.pid)
- os.kill(self.child.pid, signal.SIGINT)
- self.info.log("waitpid(%d)" % self.child.pid)
- status = self.child.wait()
- return
-
-
-class JobManager(object):
-
- def __init__(self, clock, root, info):
- self.clock = clock
- self.root = root
- self.info = info
-
-
- def runJob(self, job):
- self.createSubdirectoryForJob(job)
- stackless.tasklet(self.jobRunner)(job)
-
-
- def spawn(self, *argv):
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- statusMsg = "%s: exit %d" % (argv[0], status)
- self.info.log(statusMsg)
- if status != 0:
- raise Exception(statusMsg)
- return
-
-
- def ospawn(self, *argv):
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
-
- child = Popen4(argv)
-
- child.tochild.close()
-
- output = child.fromchild.readlines()
- status = child.wait()
-
- exitStatus = None
- if (os.WIFSIGNALED(status)):
- statusStr = "signal %d" % os.WTERMSIG(status)
- elif (os.WIFEXITED(status)):
- exitStatus = os.WEXITSTATUS(status)
- statusStr = "exit %d" % exitStatus
- else:
- statusStr = "status %d" % status
- statusMsg = "%s: %s" % (argv[0], statusStr)
-
- for line in output:
- self.info.line(" " + line.rstrip())
- self.info.log(statusMsg)
-
- if exitStatus != 0:
- raise Exception(statusMsg)
-
- return output
-
-
- def download(self, url, pathname):
- import shutil
- import urllib2
- self.info.log("downloading %s" % url)
- infile = urllib2.urlopen(url)
- outfile = open(pathname, 'wb')
- shutil.copyfileobj(infile, outfile)
- outfile.close()
- infile.close()
- return
-
- def downloadInputFilesForJob(self, job):
- for inputFile in job.inputFiles:
- url = job.urlForInputFile(inputFile)
- filename = inputFile.split('/')[-1] # strips 'mineos/'
- pathname = os.path.join(job.directory, filename)
- self.download(url, pathname)
- return
-
- def createSubdirectoryForJob(self, job):
- while True:
- dirName = randomName()
- dirPath = os.path.join(self.root, dirName)
- if not os.path.exists(dirPath):
- os.mkdir(dirPath)
- break
- job.subdir = dirName
- job.directory = dirPath
- return
-
-
-class ForkJobManager(JobManager):
-
- def jobRunner(self, job):
-
- try:
- self.downloadInputFilesForJob(job)
-
- argv = self.argvForJob(job)
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
- savedWd = os.getcwd()
- os.chdir(job.directory)
- pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
- os.chdir(savedWd)
- self.info.log("spawned process %d" % pid)
-
- ticks = 2
- wpid, status = os.waitpid(pid, os.WNOHANG)
- while wpid == 0:
- for t in xrange(ticks):
- self.clock.tick.wait()
- ticks = min(ticks * 2, 10)
- wpid, status = os.waitpid(pid, os.WNOHANG)
-
- self.uploadOutputFilesForJob(job)
-
- if (os.WIFSIGNALED(status)):
- statusStr = "signal %d" % os.WTERMSIG(status)
- job.setStatus(job.STATUS_FAILED)
- elif (os.WIFEXITED(status)):
- exitStatus = os.WEXITSTATUS(status)
- statusStr = "exit %d" % exitStatus
- if exitStatus == 0:
- job.setStatus(job.STATUS_DONE)
- else:
- job.setStatus(job.STATUS_FAILED)
- else:
- statusStr = "status %d" % status
- job.setStatus(job.STATUS_FAILED)
- statusMsg = "%s: %s" % (argv[0], statusStr)
- self.info.log(statusMsg)
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- job.setStatus(job.STATUS_FAILED)
-
- return
-
-
- def argvForJob(self, job):
- return ([job.resSpec['executable']] + job.resSpec['arguments'] +
- ["--progress-url=%s" % job.progressUrl])
-
-
- def uploadOutputFilesForJob(self, job):
- return
-
-
-class RemoteShellJobManager(ForkJobManager):
-
- def __init__(self, clock, root, info, config):
- ForkJobManager.__init__(self, clock, root, info)
- self.rsh = config.remoteShellCommand.split()
- self.rdown = config.remoteDownloadCommand
- self.rup = config.remoteUploadCommand
- self.remoteOutputRoot = config.remoteOutputRoot
-
-
- def downloadInputFilesForJob(self, job):
- # First, download to the local host.
- ForkJobManager.downloadInputFilesForJob(self, job)
-
- # Create a remote directory for this run.
- remoteDirectory = self.remoteDirectoryForJob(job)
- argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
- self.spawn(*argv)
-
- # Now copy to the remote host.
- for inputFile in job.inputFiles:
- filename = inputFile.split('/')[-1] # strips 'mineos/'
- pathname = os.path.join(job.directory, filename)
- remotePathname = os.path.join(remoteDirectory, filename)
- argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
- self.spawn(*argv)
-
- return
-
-
- def uploadOutputFilesForJob(self, job):
- remoteDirectory = self.remoteDirectoryForJob(job)
- for filename in job.outputFiles:
- pathname = os.path.join(job.directory, filename)
- remotePathname = os.path.join(remoteDirectory, filename)
- argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
- try:
- self.spawn(*argv)
- except Exception:
- pass
- return
-
-
- def argvForJob(self, job):
- remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
- ' '.join(ForkJobManager.argvForJob(self, job)))
- return self.rsh + [remoteCommand]
-
-
- def remoteDirectoryForJob(self, job):
- return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
-
-
-class GlobusJobManager(JobManager):
-
- def jobRunner(self, job):
-
- try:
- self.downloadInputFilesForJob(job)
- gassServer = GassServer.startUp(job.directory, self.info)
-
- resSpec = self.resSpec(job, gassServer)
- id = self.globusrun(resSpec)
-
- oldStatus = job.status
- ticks = 2
- while job.isAlive():
- for t in xrange(ticks):
- self.clock.tick.wait()
- ticks *= 2
- status = self.getJobStatus(id)
- if status != oldStatus:
- job.setStatus(status)
- oldStatus = status
- ticks = 2
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- job.setStatus(job.STATUS_FAILED)
-
- finally:
- gassServer.shutDown()
-
- return
-
-
- def globusrun(self, resSpec):
- import tempfile
- fd, rslName = tempfile.mkstemp(suffix=".rsl")
- stream = os.fdopen(fd, "w")
- try:
- self.writeRsl(resSpec, stream)
- stream.close()
- if resSpec['jobType'] == "mpi":
- resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
- else:
- resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
- output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
- id = None
- for line in output:
- if line.startswith("https://"):
- id = line.strip()
- assert id
- finally:
- stream.close()
- os.unlink(rslName)
- return id
-
-
- def writeRsl(self, resSpec, stream):
- print >>stream, '&'
- for relation in resSpec.iteritems():
- attribute, valueSequence = relation
- valueSequence = self.rslValueSequenceToString(valueSequence)
- print >>stream, '(', attribute, '=', valueSequence, ')'
- print >>stream, TACC_ENVIRONMENT
- return
-
-
- def rslValueSequenceToString(self, valueSequence):
- if not isinstance(valueSequence, (tuple, list)):
- valueSequence = [valueSequence]
- s = []
- for value in valueSequence:
- if isinstance(value, RSLUnquoted):
- s.append(value.value)
- elif isinstance(value, (tuple, list)):
- s.append('(' + self.rslValueSequenceToString(value) + ')')
- else:
- s.append('"%s"' % value)
- return ' '.join(s)
-
-
- def getJobStatus(self, id):
- output = self.ospawn("globus-job-status", id)
- status = output[0].strip()
- return status
-
-
- def resSpec(self, job, gassServer):
- resSpec = {}
- resSpec.update(job.resSpec)
- resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
- resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
-
- file_stage_in = []
- for inputFile in job.inputFiles:
- url = gassServer.url + "/./" + inputFile
- file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
- resSpec["file_stage_in"] = file_stage_in
-
- file_stage_out = []
- for outputFile in job.outputFiles:
- url = gassServer.url + "/./" + outputFile
- file_stage_out.append([outputFile, url])
- if file_stage_out:
- resSpec["file_stage_out"] = file_stage_out
-
- resSpec["stdout"] = gassServer.url + "/./stdout.txt"
- resSpec["stderr"] = gassServer.url + "/./stderr.txt"
-
- job.outputFiles.extend(["stdout.txt", "stderr.txt"])
-
- return resSpec
-
-
-class Event(object):
-
- def __init__(self):
- self.channel = stackless.channel()
-
- def wait(self):
- self.channel.receive()
- return
-
- def signal(self):
-
- # Swap-in a new channel, so that a waiting tasklet that
- # repeatedly calls wait() will block waiting for the *next*
- # signal on its subsequent call to wait().
- channel = self.channel
- self.channel = stackless.channel()
-
- # Unblock all waiters.
- while channel.balance < 0:
- channel.send(None)
-
- return
-
-
-class Job(object):
-
- # status codes
- STATUS_NEW = "NEW" # pseudo
- STATUS_PENDING = "PENDING"
- STATUS_ACTIVE = "ACTIVE"
- STATUS_DONE = "DONE"
- STATUS_FAILED = "FAILED"
- statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
- deadCodes = [STATUS_DONE, STATUS_FAILED]
-
- def __init__(self, run, task, **resSpec):
- self.run = run
- self.task = task
- self.resSpec = resSpec
- self.status = self.STATUS_NEW
- self.statusChanged = Event()
- self.inputFiles = []
- self.urlForInputFile = None
- self.outputFiles = []
- self.subdir = None
- self.directory = None
-
- def setStatus(self, status):
- if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
- assert status in self.statusCodes, "unknown status: %s" % status
- self.status = status
- self.statusChanged.signal()
-
- def isAlive(self):
- return not self.status in self.deadCodes
-
-
-class Run(object):
-
- # status codes
- STATUS_NEW = "ready"
- STATUS_CONNECTING = "connecting"
- STATUS_PREPARING = "preparing" # reported by build & schedule process
- STATUS_PENDING = "pending" # reported by build & schedule process
- STATUS_MESHING = "meshing" # reported by launcher process
- STATUS_SOLVING = "solving" # reported by launcher process
- STATUS_FINISHING = "finishing" # reported by launcher process
- STATUS_DONE = "done"
- STATUS_ERROR = "error"
- deadCodes = [STATUS_DONE, STATUS_ERROR]
-
- def __init__(self, id, urlForInputFile, config, info, jm):
- self.id = id
- self.urlForInputFile = urlForInputFile
- self.specfemPathname = config.specfemPathname
- self.mineosPathname = config.mineosPathname
- self.dry = config.dry
- self.info = info
- self.jm = jm
-
- self.jobChannel = stackless.channel()
- self.status = self.STATUS_NEW
- self.statusChanged = Event()
-
- def go(self):
- stackless.tasklet(self)()
-
- def __call__(self):
- try:
- self.setStatus(self.STATUS_CONNECTING)
-
- # run
- job = self.newJob()
- self.jm.runJob(job)
-
- # send jobs to jobSink()
- self.jobChannel.send(job)
- self.jobChannel.send(None) # no more jobs
-
- self.setStatus(self.STATUS_PREPARING)
-
- while job.isAlive():
- job.statusChanged.wait()
- # while
-
- if job.status == job.STATUS_FAILED:
- self.setStatus(self.STATUS_ERROR)
- raise RuntimeError("run failed")
-
- self.setStatus(self.STATUS_DONE)
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- self.setStatus(self.STATUS_ERROR)
-
- return
-
- def setStatus(self, status):
- self.status = status
- self.statusChanged.signal()
-
- def isAlive(self):
- return not self.status in self.deadCodes
-
-
-class SpecfemRun(Run):
-
- def newJob(self):
- dry = []
- if self.dry:
- dry = ["--scheduler.dry"]
- parameters = 'par_file.txt'
- event = 'event.txt'
- stations = 'stations.txt'
- job = Job(
- self,
- "run",
- jobType = "single",
- count = 1,
- executable = self.specfemPathname,
- arguments = ['--par-file=' + parameters,
- '--cmt-solution=' + event,
- '--stations=' + stations,
- "--scheduler.wait=True",
- "--job.name=run%05d" % self.id,
- "--macros.run.id=%05d" % self.id,
- #"--job.walltime=2*hour",
- "--job.stdout=stdout.txt",
- "--job.stderr=stderr.txt",
- ] + dry,
- )
- job.urlForInputFile = self.urlForInputFile
- job.inputFiles = [parameters, event, stations]
- job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
- return job
-
-
-class MineosRun(Run):
-
- def newJob(self):
- job = Job(
- self,
- "run",
- executable = self.mineosPathname,
- arguments = ["parameters.pml"],
- )
- job.urlForInputFile = self.urlForInputFile
- job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
- job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
- return job
-
-
-class PortalConnection(object):
-
- MULTIPART_BOUNDARY = '----------eArThQuAkE$'
-
- def __init__(self, portal, outputRootUrl, clock, info):
- self.portal = portal
- self.outputRootUrl = outputRootUrl
- self.clock = clock
- self.info = info
-
- def runSimulations(self, gjm, fjm, config):
- stackless.tasklet(self.runFactory)(gjm, fjm, config)
-
- def runFactory(self, gjm, fjm, config):
- import urllib2
-
- runs = {}
-
- while True:
- self.clock.tick.wait()
-
- #self.info.log("GET %s" % self.portal.runsUrl)
- try:
- infile = urllib2.urlopen(self.portal.runsUrl)
- except Exception, e:
- # Could be transient failure -- e.g., "connection reset by peer".
- self.info.log("error: %s" % e)
- continue
- #self.info.log("OK")
- runList = infile.read()
- infile.close()
-
- runList = eval(runList)
-
- for run in runList:
- id = int(run['id'])
- status = run['status']
- code = run['code']
- if (status == Run.STATUS_NEW and not runs.has_key(id)):
- self.info.log("new run %d" % id)
-
- urlForInputFile = URLForInputFile(self.portal.inputFileUrl, id)
-
- if code == 1:
- newRun = SpecfemRun(id, urlForInputFile, config, self.info, gjm)
- elif code == 2:
- newRun = MineosRun(id, urlForInputFile, config, self.info, fjm)
- else:
- self.info.log("unknown code %d" % code)
- continue
-
- self.watchRun(newRun)
- runs[id] = newRun
- newRun.go()
-
- return
-
- def watchRun(self, run):
- stackless.tasklet(self.runWatcher)(run)
- stackless.tasklet(self.jobSink)(run)
-
- def runWatcher(self, run):
- url = self.portal.runStatusUrl % run.id
- while run.isAlive():
- run.statusChanged.wait()
- fields = {'status': run.status}
- self.postStatusChange(url, fields)
- return
-
- def jobSink(self, run):
- newJob = run.jobChannel.receive()
- while newJob is not None:
- self.watchJob(newJob, run)
- newJob = run.jobChannel.receive()
- return
-
- def watchJob(self, job, run):
- url = self.portal.jobCreateUrl
- fields = self.jobFields(job, run)
- response = self.postStatusChange(url, fields)
- portalId = eval(response)
- job.progressUrl = self.portal.jobProgressUrl % portalId
- stackless.tasklet(self.jobWatcher)(job, portalId, run)
-
- def jobWatcher(self, job, portalId, run):
- url = self.portal.jobUpdateUrl % portalId
- while job.isAlive():
- job.statusChanged.wait()
- fields = self.jobFields(job, run)
- self.postStatusChange(url, fields)
- self.postJobOutput(job, portalId)
- return
-
- def postJobOutput(self, job, portalId):
- url = self.portal.outputCreateUrl
-
- for outputFile in job.outputFiles:
-
- pathname = os.path.join(job.directory, outputFile)
- if not os.path.exists(pathname):
- continue
-
- # in case the daemon and the web server are run as
- # different users
- import stat
- os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
-
- fields = {
- 'job': portalId,
- 'name': outputFile,
- }
- self.postStatusChange(url, fields)
-
- return
-
- def jobFields(self, job, run):
- fields = {
- 'run': run.id,
- 'task': job.task,
- 'status': job.status.lower(),
- 'url': self.outputRootUrl + job.subdir + "/",
- }
- return fields
-
- def postStatusChange(self, url, fields):
- import urllib
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- return self.post(body, headers, url)
-
- def postStatusChangeAndUploadOutput(self, url, fields, output):
- # Based upon a recipe by Wade Leftwich.
- files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
- body = self.encodeMultipartFormData(fields, files)
- headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
- "Content-Length": str(len(body)),
- "Accept": "text/plain"}
- return self.post(body, headers, url)
-
- def encodeMultipartFormData(self, fields, files):
- import shutil
- from StringIO import StringIO
- stream = StringIO()
- def line(s=''): stream.write(s + '\r\n')
- for key, value in fields.iteritems():
- line('--' + self.MULTIPART_BOUNDARY)
- line('Content-Disposition: form-data; name="%s"' % key)
- line()
- line(value)
- for (key, filename, contentType, contentEncoding, content) in files:
- line('--' + self.MULTIPART_BOUNDARY)
- line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
- line('Content-Type: %s' % contentType)
- line('Content-Encoding: %s' % contentEncoding)
- line()
- shutil.copyfileobj(content, stream)
- line()
- line('--' + self.MULTIPART_BOUNDARY + '--')
- line()
- return stream.getvalue()
-
- def post(self, body, headers, url):
- self.info.log("POST %s" % url)
- import httplib
- if self.portal.scheme == "http":
- conn = httplib.HTTPConnection(self.portal.host)
- elif self.portal.scheme == "https":
- conn = httplib.HTTPSConnection(self.portal.host)
- else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", url, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- self.info.log("response %s" % data)
- return data
-
-
-class URLForInputFile(object):
- def __init__(self, inputFileUrl, id):
- self.inputFileUrl = inputFileUrl
- self.id = id
- def __call__(self, inputFile):
- # Map input filenames to URLs in the context of this run.
- return self.inputFileUrl % (self.id, inputFile)
-
-
-
-class Clock(object):
-
- def __init__(self, interval):
- self.interval = interval
- self.tick = Event()
- stackless.tasklet(self)()
-
- def __call__(self):
- from time import sleep
- while True:
- sleep(self.interval)
- self.tick.signal()
- stackless.schedule()
- return
-
-
-class WebPortal(Component):
-
- name = "web-portal"
-
- import pyre.inventory as pyre
- scheme = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
- host = pyre.str("host", default="localhost:8000")
- urlRoot = pyre.str("url-root", default="/portals/seismo/")
-
- def _configure(self):
- self.urlPrefix = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
- self.inputFileUrl = self.urlPrefix + 'runs/%d/%s'
- # runs
- self.runsUrl = self.urlPrefix + 'runs/list.py'
- self.runStatusUrl = self.urlRoot + 'runs/%d/status/'
- # jobs
- self.jobCreateUrl = self.urlRoot + 'jobs/create/'
- self.jobUpdateUrl = self.urlRoot + 'jobs/%d/update/'
- self.jobProgressUrl = self.urlPrefix + 'jobs/%d/progress/'
- # output
- self.outputCreateUrl = self.urlRoot + 'output/create/'
-
-
-class Daemon(Script):
-
- name = "web-portal-daemon"
-
- import pyre.inventory as pyre
- portal = pyre.facility("portal", factory=WebPortal)
- sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
-
- outputRootPathname = pyre.str("output-root-pathname")
- outputRootUrl = pyre.str("output-root-url")
-
- specfemPathname = pyre.str("specfem-pathname", default="pyspecfem3D")
- mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
-
- remote = pyre.bool("remote", default=False)
- remoteShellCommand = pyre.str("remote-shell-command")
- remoteDownloadCommand = pyre.str("remote-download-command")
- remoteUploadCommand = pyre.str("remote-upload-command")
- remoteOutputRoot = pyre.str("remote-output-root")
-
- dry = pyre.bool("dry", default=False)
-
-
- def main(self, *args, **kwds):
- self._info.activate()
- self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
- clock = Clock(self.sleepInterval / second)
- if self.remote:
- gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
- else:
- gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
- fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
- connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
- connection.runSimulations(gjm, fjm, self)
- try:
- stackless.run()
- except KeyboardInterrupt:
- self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
- return
-
-
-def randomName():
- # Stolen from pyre.services.Pickler; perhaps this should be an
- # official/"exported" Pyre function?
-
- alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
-
- import random
- random.shuffle(alphabet)
- key = "".join(alphabet)[0:16]
-
- return key
-
-
-def main(*args, **kwds):
- daemon = Daemon()
- daemon.run(*args, **kwds)
-
-
-if __name__ == "__main__":
- main()
Deleted: cs/portal/trunk/mineos.cfg
===================================================================
--- cs/portal/trunk/mineos.cfg 2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/mineos.cfg 2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,30 +0,0 @@
-
-[mineos]
-event = china_cmt_event
-#model = prem_noocean.txt
-
-radial = True
-toroidal = True
-spheroidal = True
-inner-core-toroidal = True
-
-
-# minos_bran
-eps = 1.0e-10
-wgrav = 1*milli*hertz
-lmin = 2
-lmax = 8000
-wmin = 0.0*milli*hertz
-wmax = 200.0*milli*hertz
-nmin = 0
-nmax = 0
-
-# eigcon
-max-depth = 1000*km
-
-# green
-fmin = 10*milli*hertz
-fmax = 260*milli*hertz
-nsamples = 8000
-stations = short
-
Deleted: cs/portal/trunk/mineos.py
===================================================================
--- cs/portal/trunk/mineos.py 2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/mineos.py 2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,404 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from pyre.units.SI import milli, hertz
-from pyre.units.length import km
-import os, sys, stat, popen2, time
-from os.path import basename, dirname, splitext
-
-mHz = milli*hertz
-
-# Assume this script is installed alongside the mineos binaries.
-prefix = dirname(dirname(__file__))
-
-
-mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
-
-
-class Mode(object):
- def __init__(self, jcom, id, desc):
- self.jcom = jcom
- self.id = id
- self.desc = desc
-
-
-modes = [Mode(jcom, id, desc) for jcom, id, desc in [
- (1, 'radial', 'radial'),
- (2, 'toroidal', 'toroidal'),
- (3, 'spheroidal', 'spheroidal'),
- (4, 'ictoroidal', 'inner core toroidal'),
- ]]
-del jcom, id, desc
-
-
-class Mineos(object):
-
- def __init__(self, model, event, stations, progressUrl):
- self.bin = prefix + "/bin"
- self.model = model
- self.event = event
- self.stations = stations
- self.progressUrl = progressUrl
- self.eigcon_out = []
- self.green_out = "green"
- return
-
-
- def minos_bran(self,
- mode,
- eps, wgrav,
- lmin, lmax,
- wmin, wmax,
- nmin, nmax):
- if mode.id == 'radial':
- # These are ignored.
- lmin = 0
- lmax = 0
- # Misha Barmin says that that there are no dispersion
- # branches for radial modes; thus nmin, nmax have a
- # different sense in this context. He says to fix them as
- # follows.
- nmin = 0
- nmax = 8000
- minos_bran_in = mode.id + "_mineos_bran.in"
- s = open(minos_bran_in, "w")
- print >>s, self.model
- print >>s, self.listing(mode)
- print >>s, self.eigenfunctions(mode)
- print >>s, eps, wgrav
- print >>s, mode.jcom
- print >>s, lmin, lmax, wmin, wmax, nmin, nmax
- s.close()
- self.run("minos_bran", stdin=minos_bran_in)
- return
-
-
- def eigcon(self, mode, max_depth):
- eigcon_in = mode.id + "_eigcon.in"
- s = open(eigcon_in, "w")
- print >>s, mode.jcom
- print >>s, self.model
- print >>s, max_depth
- print >>s, self.listing(mode)
- print >>s, self.eigenfunctions(mode)
- dbname = mode.id #+ "_eigcon_out"
- print >>s, dbname
- s.close()
- self.run("eigcon", stdin=eigcon_in)
- self.eigcon_out.append(dbname)
- return
-
-
- def green(self, fmin, fmax, nsamples):
- nStations = self.lineCount(self.stations + ".site")
- s = open("db_list", "w")
- for dbname in self.eigcon_out:
- print >>s, dbname
- green_in = open("green.in", "w")
- argv = [os.path.join(self.bin, "green")]
- child = popen2.Popen4(argv)
- for s in [green_in, child.tochild]:
- print >>s, self.stations
- print >>s, "db_list"
- print >>s, self.event
- print >>s, fmin, fmax
- print >>s, nsamples
- print >>s, self.green_out
- s.close()
- stationTally = 0
- self.postProgress(0.0)
- lastPostTime = time.time()
- for line in child.fromchild:
- print line,
- if line.startswith(" green: Station: "):
- stationTally += 1
- now = time.time()
- if now - lastPostTime >= 60.0:
- self.postProgress(float(stationTally) / float(nStations))
- lastPostTime = now
- status = child.wait()
- self.checkStatus(status, argv)
- return
-
-
- def creat_origin(self):
- self.run("creat_origin", [self.event, self.green_out])
-
-
- def syndat(self, plane, datatype):
- syndat_in = "syndat.in"
- s = open(syndat_in, "w")
- print >>s, self.event
- print >>s, plane # error in documentation!
- print >>s, self.green_out
- print >>s, "seismograms" #"syndat_out"
- print >>s, datatype
- s.close()
- self.run("syndat", stdin=syndat_in)
- return
-
-
- def cucss2sac(self, *args):
- self.run("cucss2sac", list(args))
- return
-
-
- def listing(self, mode):
- return mode.id + "_listing"
-
-
- def eigenfunctions(self, mode):
- return mode.id + "_eigenfunctions"
-
-
- def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
- sys.stdout.flush()
- sys.stderr.flush()
- if args is None:
- args = []
- argv = [program] + args
- pid = os.fork()
- if pid:
- # parent
- _, status = os.wait()
- self.checkStatus(status, argv)
- else:
- # child
- if stdin is not None:
- fd = os.open(stdin, os.O_RDONLY)
- os.dup2(fd, 0)
- if stdout is not None:
- fd = os.open(stdout, os.O_WRONLY, mode)
- os.dup2(fd, 1)
- if stderr is not None:
- fd = os.open(stderr, os.O_WRONLY, mode)
- os.dup2(fd, 2)
- executable = os.path.join(self.bin, argv[0])
- try:
- os.execvp(executable, argv)
- except Exception, e:
- sys.exit("%s: %s" % (executable, e))
-
- return
-
-
- def checkStatus(self, status, argv):
- exitStatus = None
- if (os.WIFSIGNALED(status)):
- statusStr = "signal %d" % os.WTERMSIG(status)
- elif (os.WIFEXITED(status)):
- exitStatus = os.WEXITSTATUS(status)
- statusStr = "exit %d" % exitStatus
- else:
- statusStr = "status %d" % status
- statusMsg = "%s: %s" % (argv[0], statusStr)
- if exitStatus != 0:
- sys.exit(statusMsg)
- return
-
-
- def lineCount(self, filename):
- tally = 0
- s = open(filename, 'r')
- for line in s:
- tally += 1
- return tally
-
-
- def postProgress(self, progress):
- import urllib, urlparse
- scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
- fields = { 'progress': "%.2f" % progress }
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- import httplib
- if scheme == "http":
- conn = httplib.HTTPConnection(host)
- elif scheme == "https":
- conn = httplib.HTTPSConnection(host)
- else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", path, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- return data
-
-
-
-class MineosScript(Script):
-
- name = "mineos"
-
- import pyre.inventory as pyre
-
- #------------------------------------------------------------------------
- # common
-
- model = pyre.str("model")
-
- # jcom
- radial = pyre.bool("radial")
- toroidal = pyre.bool("toroidal")
- spheroidal = pyre.bool("spheroidal")
- ictoroidal = pyre.bool("inner-core-toroidal")
-
- event = pyre.str("event")
-
-
- #------------------------------------------------------------------------
- # minos_bran
-
- eps = pyre.float("eps")
- wgrav = pyre.dimensional("wgrav", default=1*hertz)
-
- lmin = pyre.int("lmin")
- lmax = pyre.int("lmax")
-
- wmin = pyre.dimensional("wmin", default=1*hertz)
- wmax = pyre.dimensional("wmax", default=1*hertz)
-
- nmin = pyre.int("nmin")
- nmax = pyre.int("nmax")
-
-
- #------------------------------------------------------------------------
- # eigcon
-
- max_depth = pyre.dimensional("max-depth", default=1*km)
-
-
- #------------------------------------------------------------------------
- # green
-
- fmin = pyre.dimensional("fmin", default=1*hertz)
- fmax = pyre.dimensional("fmax", default=1*hertz)
-
- nsamples = pyre.int("nsamples")
-
- stations = pyre.str("stations")
-
- #------------------------------------------------------------------------
- # syndat
-
- plane = pyre.int("plane", default=0) # use CMT tensor components as-is
- datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
-
- #------------------------------------------------------------------------
- # misc.
-
- progressUrl = pyre.str("progress-url")
-
-
- def main(self, *args, **kwds):
-
- self._info.activate()
-
- enabledModes = []
- for mode in modes:
- if getattr(self, mode.id):
- enabledModes.append(mode)
-
- self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
- if not enabledModes:
- self._info.log("nothing to do")
- return
-
- jobdir = os.getcwd()
-
- archiveDirName = "mineos"
- os.mkdir(archiveDirName)
- inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
- for inputFile in inputFiles:
- os.rename(inputFile, os.path.join(archiveDirName, inputFile))
- os.chdir(archiveDirName)
-
- self._info.log("running Mineos program suite for model '%s'" % self.model)
- self.runMineos(self.model, enabledModes)
-
- # archive the output
- os.chdir(jobdir)
- self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
-
- # clean up
- self.spawn("/bin/rm", "-rf", archiveDirName)
-
- return
-
-
- def runMineos(self, model, enabledModes):
- mineos = Mineos(model, self.event, self.stations, self.progressUrl)
-
- # minos_bran
- for mode in enabledModes:
- self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
- # convert to millihertz
- wgrav = self.wgrav / mHz
- wmin = self.wmin / mHz
- wmax = self.wmax / mHz
- mineos.minos_bran(mode, self.eps, wgrav,
- self.lmin, self.lmax,
- wmin, wmax,
- self.nmin, self.nmax)
-
- # eigcon
- for mode in enabledModes:
- self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
- # convert to kilometers
- max_depth = self.max_depth / km
- mineos.eigcon(mode, max_depth)
-
- # green
- self._info.log("running program 'green'")
- fmin = self.fmin / mHz
- fmax = self.fmax / mHz
- mineos.green(fmin, fmax, self.nsamples)
-
- # creat_origin
- self._info.log("running program 'creat_origin'")
- mineos.creat_origin()
-
- # syndat
- self._info.log("running program 'syndat'")
- mineos.syndat(self.plane, self.datatype)
-
- # cucss2sac
- os.symlink(mineos.green_out + ".origin", "seismograms.origin")
- os.symlink(mineos.stations + ".site", "seismograms.site")
- mineos.cucss2sac("seismograms", "output") # SAC
- mineos.cucss2sac("-a", "seismograms", "output") # ASCII
-
- return
-
-
- def spawn(self, *argv):
- command = ' '.join(argv)
- self._info.log("spawning: %s" % command)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- statusMsg = "%s: exit %d" % (argv[0], status)
- self._info.log(statusMsg)
- if status != 0:
- sys.exit("%s: %s" % (argv[0], statusMsg))
- return
-
-
-def main(*args, **kwds):
- if True:
- # Redirect all output -- our journal output, and the output of
- # our children -- to a file.
- fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
- os.dup2(fd, 1)
- os.dup2(fd, 2)
- os.close(fd)
-
- mineos = MineosScript()
- mineos.run(*args, **kwds)
-
-
-if __name__ == "__main__":
- main()
-
-
-# end of file
Modified: cs/portal/trunk/northridge/MANIFEST.in
===================================================================
--- cs/portal/trunk/northridge/MANIFEST.in 2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/northridge/MANIFEST.in 2008-07-02 00:46:55 UTC (rev 12366)
@@ -21,3 +21,7 @@
include SeismoWebPortal/templates/registration/*.html
include SeismoWebPortal/templates/registration/*.txt
include SeismoWebPortal/templates/*.html
+include backend/daemon.py
+include backend/mineos.py
+include backend/web-portal-daemon.cfg
+include backend/specfem3D.py
Copied: cs/portal/trunk/northridge/backend/daemon.py (from rev 12359, cs/portal/trunk/daemon.py)
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py (rev 0)
+++ cs/portal/trunk/northridge/backend/daemon.py 2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,835 @@
+
+import stackless
+import os, sys, signal
+from popen2 import Popen4
+
+from pyre.applications import Script
+from pyre.components import Component
+from pyre.units.time import second
+
+
+# NYI: RSL AST
+class RSLUnquoted(object):
+ def __init__(self, value):
+ self.value = value
+
+TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
+TG_COMMUNITY = "/projects/tg"
+
+# I'm not sure what to do about this yet.
+TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
+
+
+
+class GassServer(object):
+
+ # @classmethod
+ def startUp(cls, directory, info):
+ argv = ["globus-gass-server", "-r", "-w", "-c"]
+ savedWd = os.getcwd()
+ os.chdir(directory)
+ child = Popen4(argv)
+ os.chdir(savedWd)
+ child.tochild.close()
+ while True:
+ url = child.fromchild.readline().strip()
+ if url.startswith("https:"):
+ break
+ child.fromchild.close()
+ return GassServer(child, url, directory, info)
+ startUp = classmethod(startUp)
+
+ def __init__(self, child, url, directory, info):
+ self.child = child
+ self.url = url
+ self.directory = directory
+ self.info = info
+
+ def shutDown(self):
+ # This doesn't work reliably...
+ argv = ["globus-gass-server-shutdown", self.url]
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ statusMsg = "%s: exit %d" % (argv[0], status)
+ self.info.log(statusMsg)
+ if status != 0:
+ # ...so sometimes we kill it!
+ self.info.log("kill -INT %d" % self.child.pid)
+ os.kill(self.child.pid, signal.SIGINT)
+ self.info.log("waitpid(%d)" % self.child.pid)
+ status = self.child.wait()
+ return
+
+
+class JobManager(object):
+
+ def __init__(self, clock, root, info):
+ self.clock = clock
+ self.root = root
+ self.info = info
+
+
+ def runJob(self, job):
+ self.createSubdirectoryForJob(job)
+ stackless.tasklet(self.jobRunner)(job)
+
+
+ def spawn(self, *argv):
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ statusMsg = "%s: exit %d" % (argv[0], status)
+ self.info.log(statusMsg)
+ if status != 0:
+ raise Exception(statusMsg)
+ return
+
+
+ def ospawn(self, *argv):
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+
+ child = Popen4(argv)
+
+ child.tochild.close()
+
+ output = child.fromchild.readlines()
+ status = child.wait()
+
+ exitStatus = None
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ else:
+ statusStr = "status %d" % status
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+
+ for line in output:
+ self.info.line(" " + line.rstrip())
+ self.info.log(statusMsg)
+
+ if exitStatus != 0:
+ raise Exception(statusMsg)
+
+ return output
+
+
+ def download(self, url, pathname):
+ import shutil
+ import urllib2
+ self.info.log("downloading %s" % url)
+ infile = urllib2.urlopen(url)
+ outfile = open(pathname, 'wb')
+ shutil.copyfileobj(infile, outfile)
+ outfile.close()
+ infile.close()
+ return
+
+ def downloadInputFilesForJob(self, job):
+ for inputFile in job.inputFiles:
+ url = job.urlForInputFile(inputFile)
+ filename = inputFile.split('/')[-1] # strips 'mineos/'
+ pathname = os.path.join(job.directory, filename)
+ self.download(url, pathname)
+ return
+
+ def createSubdirectoryForJob(self, job):
+ while True:
+ dirName = randomName()
+ dirPath = os.path.join(self.root, dirName)
+ if not os.path.exists(dirPath):
+ os.mkdir(dirPath)
+ break
+ job.subdir = dirName
+ job.directory = dirPath
+ return
+
+
+class ForkJobManager(JobManager):
+
+ def jobRunner(self, job):
+
+ try:
+ self.downloadInputFilesForJob(job)
+
+ argv = self.argvForJob(job)
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+ savedWd = os.getcwd()
+ os.chdir(job.directory)
+ pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
+ os.chdir(savedWd)
+ self.info.log("spawned process %d" % pid)
+
+ ticks = 2
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks = min(ticks * 2, 10)
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+
+ self.uploadOutputFilesForJob(job)
+
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ job.setStatus(job.STATUS_FAILED)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ if exitStatus == 0:
+ job.setStatus(job.STATUS_DONE)
+ else:
+ job.setStatus(job.STATUS_FAILED)
+ else:
+ statusStr = "status %d" % status
+ job.setStatus(job.STATUS_FAILED)
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+ self.info.log(statusMsg)
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ job.setStatus(job.STATUS_FAILED)
+
+ return
+
+
+ def argvForJob(self, job):
+ return ([job.resSpec['executable']] + job.resSpec['arguments'] +
+ ["--progress-url=%s" % job.progressUrl])
+
+
+ def uploadOutputFilesForJob(self, job):
+ return
+
+
+class RemoteShellJobManager(ForkJobManager):
+
+ def __init__(self, clock, root, info, config):
+ ForkJobManager.__init__(self, clock, root, info)
+ self.rsh = config.remoteShellCommand.split()
+ self.rdown = config.remoteDownloadCommand
+ self.rup = config.remoteUploadCommand
+ self.remoteOutputRoot = config.remoteOutputRoot
+
+
+ def downloadInputFilesForJob(self, job):
+ # First, download to the local host.
+ ForkJobManager.downloadInputFilesForJob(self, job)
+
+ # Create a remote directory for this run.
+ remoteDirectory = self.remoteDirectoryForJob(job)
+ argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
+ self.spawn(*argv)
+
+ # Now copy to the remote host.
+ for inputFile in job.inputFiles:
+ filename = inputFile.split('/')[-1] # strips 'mineos/'
+ pathname = os.path.join(job.directory, filename)
+ remotePathname = os.path.join(remoteDirectory, filename)
+ argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
+ self.spawn(*argv)
+
+ return
+
+
+ def uploadOutputFilesForJob(self, job):
+ remoteDirectory = self.remoteDirectoryForJob(job)
+ for filename in job.outputFiles:
+ pathname = os.path.join(job.directory, filename)
+ remotePathname = os.path.join(remoteDirectory, filename)
+ argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
+ try:
+ self.spawn(*argv)
+ except Exception:
+ pass
+ return
+
+
+ def argvForJob(self, job):
+ remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
+ ' '.join(ForkJobManager.argvForJob(self, job)))
+ return self.rsh + [remoteCommand]
+
+
+ def remoteDirectoryForJob(self, job):
+ return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
+
+
+class GlobusJobManager(JobManager):
+
+ def jobRunner(self, job):
+
+ try:
+ self.downloadInputFilesForJob(job)
+ gassServer = GassServer.startUp(job.directory, self.info)
+
+ resSpec = self.resSpec(job, gassServer)
+ id = self.globusrun(resSpec)
+
+ oldStatus = job.status
+ ticks = 2
+ while job.isAlive():
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks *= 2
+ status = self.getJobStatus(id)
+ if status != oldStatus:
+ job.setStatus(status)
+ oldStatus = status
+ ticks = 2
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ job.setStatus(job.STATUS_FAILED)
+
+ finally:
+ gassServer.shutDown()
+
+ return
+
+
+ def globusrun(self, resSpec):
+ import tempfile
+ fd, rslName = tempfile.mkstemp(suffix=".rsl")
+ stream = os.fdopen(fd, "w")
+ try:
+ self.writeRsl(resSpec, stream)
+ stream.close()
+ if resSpec['jobType'] == "mpi":
+ resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
+ else:
+ resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
+ output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
+ id = None
+ for line in output:
+ if line.startswith("https://"):
+ id = line.strip()
+ assert id
+ finally:
+ stream.close()
+ os.unlink(rslName)
+ return id
+
+
+ def writeRsl(self, resSpec, stream):
+ print >>stream, '&'
+ for relation in resSpec.iteritems():
+ attribute, valueSequence = relation
+ valueSequence = self.rslValueSequenceToString(valueSequence)
+ print >>stream, '(', attribute, '=', valueSequence, ')'
+ print >>stream, TACC_ENVIRONMENT
+ return
+
+
+ def rslValueSequenceToString(self, valueSequence):
+ if not isinstance(valueSequence, (tuple, list)):
+ valueSequence = [valueSequence]
+ s = []
+ for value in valueSequence:
+ if isinstance(value, RSLUnquoted):
+ s.append(value.value)
+ elif isinstance(value, (tuple, list)):
+ s.append('(' + self.rslValueSequenceToString(value) + ')')
+ else:
+ s.append('"%s"' % value)
+ return ' '.join(s)
+
+
+ def getJobStatus(self, id):
+ output = self.ospawn("globus-job-status", id)
+ status = output[0].strip()
+ return status
+
+
+ def resSpec(self, job, gassServer):
+ resSpec = {}
+ resSpec.update(job.resSpec)
+ resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
+ resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+
+ file_stage_in = []
+ for inputFile in job.inputFiles:
+ url = gassServer.url + "/./" + inputFile
+ file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
+ resSpec["file_stage_in"] = file_stage_in
+
+ file_stage_out = []
+ for outputFile in job.outputFiles:
+ url = gassServer.url + "/./" + outputFile
+ file_stage_out.append([outputFile, url])
+ if file_stage_out:
+ resSpec["file_stage_out"] = file_stage_out
+
+ resSpec["stdout"] = gassServer.url + "/./stdout.txt"
+ resSpec["stderr"] = gassServer.url + "/./stderr.txt"
+
+ job.outputFiles.extend(["stdout.txt", "stderr.txt"])
+
+ return resSpec
+
+
+class Event(object):
+
+ def __init__(self):
+ self.channel = stackless.channel()
+
+ def wait(self):
+ self.channel.receive()
+ return
+
+ def signal(self):
+
+ # Swap-in a new channel, so that a waiting tasklet that
+ # repeatedly calls wait() will block waiting for the *next*
+ # signal on its subsequent call to wait().
+ channel = self.channel
+ self.channel = stackless.channel()
+
+ # Unblock all waiters.
+ while channel.balance < 0:
+ channel.send(None)
+
+ return
+
+
+class Job(object):
+
+ # status codes
+ STATUS_NEW = "NEW" # pseudo
+ STATUS_PENDING = "PENDING"
+ STATUS_ACTIVE = "ACTIVE"
+ STATUS_DONE = "DONE"
+ STATUS_FAILED = "FAILED"
+ statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
+ deadCodes = [STATUS_DONE, STATUS_FAILED]
+
+ def __init__(self, run, task, **resSpec):
+ self.run = run
+ self.task = task
+ self.resSpec = resSpec
+ self.status = self.STATUS_NEW
+ self.statusChanged = Event()
+ self.inputFiles = []
+ self.urlForInputFile = None
+ self.outputFiles = []
+ self.subdir = None
+ self.directory = None
+
+ def setStatus(self, status):
+ if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
+ assert status in self.statusCodes, "unknown status: %s" % status
+ self.status = status
+ self.statusChanged.signal()
+
+ def isAlive(self):
+ return not self.status in self.deadCodes
+
+
+class Run(object):
+
+ # status codes
+ STATUS_NEW = "ready"
+ STATUS_CONNECTING = "connecting"
+ STATUS_PREPARING = "preparing" # reported by build & schedule process
+ STATUS_PENDING = "pending" # reported by build & schedule process
+ STATUS_MESHING = "meshing" # reported by launcher process
+ STATUS_SOLVING = "solving" # reported by launcher process
+ STATUS_FINISHING = "finishing" # reported by launcher process
+ STATUS_DONE = "done"
+ STATUS_ERROR = "error"
+ deadCodes = [STATUS_DONE, STATUS_ERROR]
+
+ def __init__(self, id, urlForInputFile, config, info, jm):
+ self.id = id
+ self.urlForInputFile = urlForInputFile
+ self.specfemPathname = config.specfemPathname
+ self.mineosPathname = config.mineosPathname
+ self.dry = config.dry
+ self.info = info
+ self.jm = jm
+
+ self.jobChannel = stackless.channel()
+ self.status = self.STATUS_NEW
+ self.statusChanged = Event()
+
+ def go(self):
+ stackless.tasklet(self)()
+
+ def __call__(self):
+ try:
+ self.setStatus(self.STATUS_CONNECTING)
+
+ # run
+ job = self.newJob()
+ self.jm.runJob(job)
+
+ # send jobs to jobSink()
+ self.jobChannel.send(job)
+ self.jobChannel.send(None) # no more jobs
+
+ self.setStatus(self.STATUS_PREPARING)
+
+ while job.isAlive():
+ job.statusChanged.wait()
+ # while
+
+ if job.status == job.STATUS_FAILED:
+ self.setStatus(self.STATUS_ERROR)
+ raise RuntimeError("run failed")
+
+ self.setStatus(self.STATUS_DONE)
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ self.setStatus(self.STATUS_ERROR)
+
+ return
+
+ def setStatus(self, status):
+ self.status = status
+ self.statusChanged.signal()
+
+ def isAlive(self):
+ return not self.status in self.deadCodes
+
+
+class SpecfemRun(Run):
+
+ def newJob(self):
+ dry = []
+ if self.dry:
+ dry = ["--scheduler.dry"]
+ parameters = 'par_file.txt'
+ event = 'event.txt'
+ stations = 'stations.txt'
+ job = Job(
+ self,
+ "run",
+ jobType = "single",
+ count = 1,
+ executable = self.specfemPathname,
+ arguments = ['--par-file=' + parameters,
+ '--cmt-solution=' + event,
+ '--stations=' + stations,
+ "--scheduler.wait=True",
+ "--job.name=run%05d" % self.id,
+ "--macros.run.id=%05d" % self.id,
+ #"--job.walltime=2*hour",
+ "--job.stdout=stdout.txt",
+ "--job.stderr=stderr.txt",
+ ] + dry,
+ )
+ job.urlForInputFile = self.urlForInputFile
+ job.inputFiles = [parameters, event, stations]
+ job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
+ return job
+
+
+class MineosRun(Run):
+
+ def newJob(self):
+ job = Job(
+ self,
+ "run",
+ executable = self.mineosPathname,
+ arguments = ["parameters.pml"],
+ )
+ job.urlForInputFile = self.urlForInputFile
+ job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
+ job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
+ return job
+
+
+class PortalConnection(object):
+
+ MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+ def __init__(self, portal, outputRootUrl, clock, info):
+ self.portal = portal
+ self.outputRootUrl = outputRootUrl
+ self.clock = clock
+ self.info = info
+
+ def runSimulations(self, gjm, fjm, config):
+ stackless.tasklet(self.runFactory)(gjm, fjm, config)
+
+ def runFactory(self, gjm, fjm, config):
+ import urllib2
+
+ runs = {}
+
+ while True:
+ self.clock.tick.wait()
+
+ #self.info.log("GET %s" % self.portal.runsUrl)
+ try:
+ infile = urllib2.urlopen(self.portal.runsUrl)
+ except Exception, e:
+ # Could be transient failure -- e.g., "connection reset by peer".
+ self.info.log("error: %s" % e)
+ continue
+ #self.info.log("OK")
+ runList = infile.read()
+ infile.close()
+
+ runList = eval(runList)
+
+ for run in runList:
+ id = int(run['id'])
+ status = run['status']
+ code = run['code']
+ if (status == Run.STATUS_NEW and not runs.has_key(id)):
+ self.info.log("new run %d" % id)
+
+ urlForInputFile = URLForInputFile(self.portal.inputFileUrl, id)
+
+ if code == 1:
+ newRun = SpecfemRun(id, urlForInputFile, config, self.info, gjm)
+ elif code == 2:
+ newRun = MineosRun(id, urlForInputFile, config, self.info, fjm)
+ else:
+ self.info.log("unknown code %d" % code)
+ continue
+
+ self.watchRun(newRun)
+ runs[id] = newRun
+ newRun.go()
+
+ return
+
+ def watchRun(self, run):
+ stackless.tasklet(self.runWatcher)(run)
+ stackless.tasklet(self.jobSink)(run)
+
+ def runWatcher(self, run):
+ url = self.portal.runStatusUrl % run.id
+ while run.isAlive():
+ run.statusChanged.wait()
+ fields = {'status': run.status}
+ self.postStatusChange(url, fields)
+ return
+
+ def jobSink(self, run):
+ newJob = run.jobChannel.receive()
+ while newJob is not None:
+ self.watchJob(newJob, run)
+ newJob = run.jobChannel.receive()
+ return
+
+ def watchJob(self, job, run):
+ url = self.portal.jobCreateUrl
+ fields = self.jobFields(job, run)
+ response = self.postStatusChange(url, fields)
+ portalId = eval(response)
+ job.progressUrl = self.portal.jobProgressUrl % portalId
+ stackless.tasklet(self.jobWatcher)(job, portalId, run)
+
+ def jobWatcher(self, job, portalId, run):
+ url = self.portal.jobUpdateUrl % portalId
+ while job.isAlive():
+ job.statusChanged.wait()
+ fields = self.jobFields(job, run)
+ self.postStatusChange(url, fields)
+ self.postJobOutput(job, portalId)
+ return
+
+ def postJobOutput(self, job, portalId):
+ url = self.portal.outputCreateUrl
+
+ for outputFile in job.outputFiles:
+
+ pathname = os.path.join(job.directory, outputFile)
+ if not os.path.exists(pathname):
+ continue
+
+ # in case the daemon and the web server are run as
+ # different users
+ import stat
+ os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+
+ fields = {
+ 'job': portalId,
+ 'name': outputFile,
+ }
+ self.postStatusChange(url, fields)
+
+ return
+
+ def jobFields(self, job, run):
+ fields = {
+ 'run': run.id,
+ 'task': job.task,
+ 'status': job.status.lower(),
+ 'url': self.outputRootUrl + job.subdir + "/",
+ }
+ return fields
+
+ def postStatusChange(self, url, fields):
+ import urllib
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ return self.post(body, headers, url)
+
+ def postStatusChangeAndUploadOutput(self, url, fields, output):
+ # Based upon a recipe by Wade Leftwich.
+ files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
+ body = self.encodeMultipartFormData(fields, files)
+ headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
+ "Content-Length": str(len(body)),
+ "Accept": "text/plain"}
+ return self.post(body, headers, url)
+
+ def encodeMultipartFormData(self, fields, files):
+ import shutil
+ from StringIO import StringIO
+ stream = StringIO()
+ def line(s=''): stream.write(s + '\r\n')
+ for key, value in fields.iteritems():
+ line('--' + self.MULTIPART_BOUNDARY)
+ line('Content-Disposition: form-data; name="%s"' % key)
+ line()
+ line(value)
+ for (key, filename, contentType, contentEncoding, content) in files:
+ line('--' + self.MULTIPART_BOUNDARY)
+ line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
+ line('Content-Type: %s' % contentType)
+ line('Content-Encoding: %s' % contentEncoding)
+ line()
+ shutil.copyfileobj(content, stream)
+ line()
+ line('--' + self.MULTIPART_BOUNDARY + '--')
+ line()
+ return stream.getvalue()
+
+ def post(self, body, headers, url):
+ self.info.log("POST %s" % url)
+ import httplib
+ if self.portal.scheme == "http":
+ conn = httplib.HTTPConnection(self.portal.host)
+ elif self.portal.scheme == "https":
+ conn = httplib.HTTPSConnection(self.portal.host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", url, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ self.info.log("response %s" % data)
+ return data
+
+
+class URLForInputFile(object):
+ def __init__(self, inputFileUrl, id):
+ self.inputFileUrl = inputFileUrl
+ self.id = id
+ def __call__(self, inputFile):
+ # Map input filenames to URLs in the context of this run.
+ return self.inputFileUrl % (self.id, inputFile)
+
+
+
+class Clock(object):
+
+ def __init__(self, interval):
+ self.interval = interval
+ self.tick = Event()
+ stackless.tasklet(self)()
+
+ def __call__(self):
+ from time import sleep
+ while True:
+ sleep(self.interval)
+ self.tick.signal()
+ stackless.schedule()
+ return
+
+
+class WebPortal(Component):
+
+ name = "web-portal"
+
+ import pyre.inventory as pyre
+ scheme = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
+ host = pyre.str("host", default="localhost:8000")
+ urlRoot = pyre.str("url-root", default="/portals/seismo/")
+
+ def _configure(self):
+ self.urlPrefix = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
+ self.inputFileUrl = self.urlPrefix + 'runs/%d/%s'
+ # runs
+ self.runsUrl = self.urlPrefix + 'runs/list.py'
+ self.runStatusUrl = self.urlRoot + 'runs/%d/status/'
+ # jobs
+ self.jobCreateUrl = self.urlRoot + 'jobs/create/'
+ self.jobUpdateUrl = self.urlRoot + 'jobs/%d/update/'
+ self.jobProgressUrl = self.urlPrefix + 'jobs/%d/progress/'
+ # output
+ self.outputCreateUrl = self.urlRoot + 'output/create/'
+
+
+class Daemon(Script):
+
+ name = "web-portal-daemon"
+
+ import pyre.inventory as pyre
+ portal = pyre.facility("portal", factory=WebPortal)
+ sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+
+ outputRootPathname = pyre.str("output-root-pathname")
+ outputRootUrl = pyre.str("output-root-url")
+
+ specfemPathname = pyre.str("specfem-pathname", default="specfem3D.py")
+ mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
+
+ remote = pyre.bool("remote", default=False)
+ remoteShellCommand = pyre.str("remote-shell-command")
+ remoteDownloadCommand = pyre.str("remote-download-command")
+ remoteUploadCommand = pyre.str("remote-upload-command")
+ remoteOutputRoot = pyre.str("remote-output-root")
+
+ dry = pyre.bool("dry", default=False)
+
+
+ def main(self, *args, **kwds):
+ self._info.activate()
+ self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
+ clock = Clock(self.sleepInterval / second)
+ if self.remote:
+ gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
+ else:
+ gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+ fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
+ connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
+ connection.runSimulations(gjm, fjm, self)
+ try:
+ stackless.run()
+ except KeyboardInterrupt:
+ self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
+ return
+
+
+def randomName():
+ # Stolen from pyre.services.Pickler; perhaps this should be an
+ # official/"exported" Pyre function?
+
+ alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
+
+ import random
+ random.shuffle(alphabet)
+ key = "".join(alphabet)[0:16]
+
+ return key
+
+
+def main(*args, **kwds):
+ daemon = Daemon()
+ daemon.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+ main()
Copied: cs/portal/trunk/northridge/backend/mineos.py (from rev 12359, cs/portal/trunk/mineos.py)
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py (rev 0)
+++ cs/portal/trunk/northridge/backend/mineos.py 2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,404 @@
+#!/usr/bin/env python
+
+
+from pyre.applications import Script
+from pyre.units.SI import milli, hertz
+from pyre.units.length import km
+import os, sys, stat, popen2, time
+from os.path import basename, dirname, splitext
+
+mHz = milli*hertz
+
+# Assume this script is installed alongside the mineos binaries.
+prefix = dirname(dirname(__file__))
+
+
+mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+
+
+class Mode(object):
+ def __init__(self, jcom, id, desc):
+ self.jcom = jcom
+ self.id = id
+ self.desc = desc
+
+
+modes = [Mode(jcom, id, desc) for jcom, id, desc in [
+ (1, 'radial', 'radial'),
+ (2, 'toroidal', 'toroidal'),
+ (3, 'spheroidal', 'spheroidal'),
+ (4, 'ictoroidal', 'inner core toroidal'),
+ ]]
+del jcom, id, desc
+
+
+class Mineos(object):
+
+ def __init__(self, model, event, stations, progressUrl):
+ self.bin = prefix + "/bin"
+ self.model = model
+ self.event = event
+ self.stations = stations
+ self.progressUrl = progressUrl
+ self.eigcon_out = []
+ self.green_out = "green"
+ return
+
+
+ def minos_bran(self,
+ mode,
+ eps, wgrav,
+ lmin, lmax,
+ wmin, wmax,
+ nmin, nmax):
+ if mode.id == 'radial':
+ # These are ignored.
+ lmin = 0
+ lmax = 0
+ # Misha Barmin says that that there are no dispersion
+ # branches for radial modes; thus nmin, nmax have a
+ # different sense in this context. He says to fix them as
+ # follows.
+ nmin = 0
+ nmax = 8000
+ minos_bran_in = mode.id + "_mineos_bran.in"
+ s = open(minos_bran_in, "w")
+ print >>s, self.model
+ print >>s, self.listing(mode)
+ print >>s, self.eigenfunctions(mode)
+ print >>s, eps, wgrav
+ print >>s, mode.jcom
+ print >>s, lmin, lmax, wmin, wmax, nmin, nmax
+ s.close()
+ self.run("minos_bran", stdin=minos_bran_in)
+ return
+
+
+ def eigcon(self, mode, max_depth):
+ eigcon_in = mode.id + "_eigcon.in"
+ s = open(eigcon_in, "w")
+ print >>s, mode.jcom
+ print >>s, self.model
+ print >>s, max_depth
+ print >>s, self.listing(mode)
+ print >>s, self.eigenfunctions(mode)
+ dbname = mode.id #+ "_eigcon_out"
+ print >>s, dbname
+ s.close()
+ self.run("eigcon", stdin=eigcon_in)
+ self.eigcon_out.append(dbname)
+ return
+
+
+ def green(self, fmin, fmax, nsamples):
+ nStations = self.lineCount(self.stations + ".site")
+ s = open("db_list", "w")
+ for dbname in self.eigcon_out:
+ print >>s, dbname
+ green_in = open("green.in", "w")
+ argv = [os.path.join(self.bin, "green")]
+ child = popen2.Popen4(argv)
+ for s in [green_in, child.tochild]:
+ print >>s, self.stations
+ print >>s, "db_list"
+ print >>s, self.event
+ print >>s, fmin, fmax
+ print >>s, nsamples
+ print >>s, self.green_out
+ s.close()
+ stationTally = 0
+ self.postProgress(0.0)
+ lastPostTime = time.time()
+ for line in child.fromchild:
+ print line,
+ if line.startswith(" green: Station: "):
+ stationTally += 1
+ now = time.time()
+ if now - lastPostTime >= 60.0:
+ self.postProgress(float(stationTally) / float(nStations))
+ lastPostTime = now
+ status = child.wait()
+ self.checkStatus(status, argv)
+ return
+
+
+ def creat_origin(self):
+ self.run("creat_origin", [self.event, self.green_out])
+
+
+ def syndat(self, plane, datatype):
+ syndat_in = "syndat.in"
+ s = open(syndat_in, "w")
+ print >>s, self.event
+ print >>s, plane # error in documentation!
+ print >>s, self.green_out
+ print >>s, "seismograms" #"syndat_out"
+ print >>s, datatype
+ s.close()
+ self.run("syndat", stdin=syndat_in)
+ return
+
+
+ def cucss2sac(self, *args):
+ self.run("cucss2sac", list(args))
+ return
+
+
+ def listing(self, mode):
+ return mode.id + "_listing"
+
+
+ def eigenfunctions(self, mode):
+ return mode.id + "_eigenfunctions"
+
+
+ def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ if args is None:
+ args = []
+ argv = [program] + args
+ pid = os.fork()
+ if pid:
+ # parent
+ _, status = os.wait()
+ self.checkStatus(status, argv)
+ else:
+ # child
+ if stdin is not None:
+ fd = os.open(stdin, os.O_RDONLY)
+ os.dup2(fd, 0)
+ if stdout is not None:
+ fd = os.open(stdout, os.O_WRONLY, mode)
+ os.dup2(fd, 1)
+ if stderr is not None:
+ fd = os.open(stderr, os.O_WRONLY, mode)
+ os.dup2(fd, 2)
+ executable = os.path.join(self.bin, argv[0])
+ try:
+ os.execvp(executable, argv)
+ except Exception, e:
+ sys.exit("%s: %s" % (executable, e))
+
+ return
+
+
+ def checkStatus(self, status, argv):
+ exitStatus = None
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ else:
+ statusStr = "status %d" % status
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+ if exitStatus != 0:
+ sys.exit(statusMsg)
+ return
+
+
+ def lineCount(self, filename):
+ tally = 0
+ s = open(filename, 'r')
+ for line in s:
+ tally += 1
+ return tally
+
+
+ def postProgress(self, progress):
+ import urllib, urlparse
+ scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+ fields = { 'progress': "%.2f" % progress }
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ import httplib
+ if scheme == "http":
+ conn = httplib.HTTPConnection(host)
+ elif scheme == "https":
+ conn = httplib.HTTPSConnection(host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", path, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ return data
+
+
+
+class MineosScript(Script):
+
+ name = "mineos"
+
+ import pyre.inventory as pyre
+
+ #------------------------------------------------------------------------
+ # common
+
+ model = pyre.str("model")
+
+ # jcom
+ radial = pyre.bool("radial")
+ toroidal = pyre.bool("toroidal")
+ spheroidal = pyre.bool("spheroidal")
+ ictoroidal = pyre.bool("inner-core-toroidal")
+
+ event = pyre.str("event")
+
+
+ #------------------------------------------------------------------------
+ # minos_bran
+
+ eps = pyre.float("eps")
+ wgrav = pyre.dimensional("wgrav", default=1*hertz)
+
+ lmin = pyre.int("lmin")
+ lmax = pyre.int("lmax")
+
+ wmin = pyre.dimensional("wmin", default=1*hertz)
+ wmax = pyre.dimensional("wmax", default=1*hertz)
+
+ nmin = pyre.int("nmin")
+ nmax = pyre.int("nmax")
+
+
+ #------------------------------------------------------------------------
+ # eigcon
+
+ max_depth = pyre.dimensional("max-depth", default=1*km)
+
+
+ #------------------------------------------------------------------------
+ # green
+
+ fmin = pyre.dimensional("fmin", default=1*hertz)
+ fmax = pyre.dimensional("fmax", default=1*hertz)
+
+ nsamples = pyre.int("nsamples")
+
+ stations = pyre.str("stations")
+
+ #------------------------------------------------------------------------
+ # syndat
+
+ plane = pyre.int("plane", default=0) # use CMT tensor components as-is
+ datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
+
+ #------------------------------------------------------------------------
+ # misc.
+
+ progressUrl = pyre.str("progress-url")
+
+
+ def main(self, *args, **kwds):
+
+ self._info.activate()
+
+ enabledModes = []
+ for mode in modes:
+ if getattr(self, mode.id):
+ enabledModes.append(mode)
+
+ self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
+ if not enabledModes:
+ self._info.log("nothing to do")
+ return
+
+ jobdir = os.getcwd()
+
+ archiveDirName = "mineos"
+ os.mkdir(archiveDirName)
+ inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
+ for inputFile in inputFiles:
+ os.rename(inputFile, os.path.join(archiveDirName, inputFile))
+ os.chdir(archiveDirName)
+
+ self._info.log("running Mineos program suite for model '%s'" % self.model)
+ self.runMineos(self.model, enabledModes)
+
+ # archive the output
+ os.chdir(jobdir)
+ self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
+
+ # clean up
+ self.spawn("/bin/rm", "-rf", archiveDirName)
+
+ return
+
+
+ def runMineos(self, model, enabledModes):
+ mineos = Mineos(model, self.event, self.stations, self.progressUrl)
+
+ # minos_bran
+ for mode in enabledModes:
+ self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
+ # convert to millihertz
+ wgrav = self.wgrav / mHz
+ wmin = self.wmin / mHz
+ wmax = self.wmax / mHz
+ mineos.minos_bran(mode, self.eps, wgrav,
+ self.lmin, self.lmax,
+ wmin, wmax,
+ self.nmin, self.nmax)
+
+ # eigcon
+ for mode in enabledModes:
+ self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
+ # convert to kilometers
+ max_depth = self.max_depth / km
+ mineos.eigcon(mode, max_depth)
+
+ # green
+ self._info.log("running program 'green'")
+ fmin = self.fmin / mHz
+ fmax = self.fmax / mHz
+ mineos.green(fmin, fmax, self.nsamples)
+
+ # creat_origin
+ self._info.log("running program 'creat_origin'")
+ mineos.creat_origin()
+
+ # syndat
+ self._info.log("running program 'syndat'")
+ mineos.syndat(self.plane, self.datatype)
+
+ # cucss2sac
+ os.symlink(mineos.green_out + ".origin", "seismograms.origin")
+ os.symlink(mineos.stations + ".site", "seismograms.site")
+ mineos.cucss2sac("seismograms", "output") # SAC
+ mineos.cucss2sac("-a", "seismograms", "output") # ASCII
+
+ return
+
+
+ def spawn(self, *argv):
+ command = ' '.join(argv)
+ self._info.log("spawning: %s" % command)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ statusMsg = "%s: exit %d" % (argv[0], status)
+ self._info.log(statusMsg)
+ if status != 0:
+ sys.exit("%s: %s" % (argv[0], statusMsg))
+ return
+
+
+def main(*args, **kwds):
+ if True:
+ # Redirect all output -- our journal output, and the output of
+ # our children -- to a file.
+ fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
+ os.dup2(fd, 1)
+ os.dup2(fd, 2)
+ os.close(fd)
+
+ mineos = MineosScript()
+ mineos.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+ main()
+
+
+# end of file
Copied: cs/portal/trunk/northridge/backend/specfem3D.py (from rev 12365, seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D)
===================================================================
--- cs/portal/trunk/northridge/backend/specfem3D.py (rev 0)
+++ cs/portal/trunk/northridge/backend/specfem3D.py 2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,310 @@
+#!/usr/bin/env python
+
+
+from pyre.applications import Script
+from os.path import dirname, exists, isdir, join
+import os, sys, stat, shutil, tarfile
+from time import sleep
+
+
+SPECFEM3D_GLOBE = dirname(__file__)
+
+
+class Specfem3DGlobe(Script):
+
+
+ name = "Specfem3DGlobe"
+
+
+ import pyre.inventory
+ import pyre.schedulers
+ import pyre.launchers
+
+ parFile = pyre.inventory.str("par-file")
+ cmtSolution = pyre.inventory.str("cmt-solution")
+ stations = pyre.inventory.str("stations")
+ configureArgs = pyre.inventory.str("configure-args", default="FC=mpif90")
+
+ scratchDir = pyre.inventory.str("scratch-dir", default="/scratch")
+
+ nodes = pyre.inventory.int("nodes", default=1)
+ scheduler = pyre.schedulers.scheduler("scheduler", default="none")
+ job = pyre.schedulers.job("job")
+ launchCommand = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
+ context = pyre.inventory.str(
+ name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+ progressUrl = pyre.inventory.str("progress-url")
+
+
+
+ def main(self, *args, **kwds):
+ context = self.context
+ if context == "login":
+ self.onLoginNode(*args, **kwds)
+ elif context == "launcher":
+ self.onLauncherNode(*args, **kwds)
+ elif context == "pre-compute":
+ self.onPreCompute(*args, **kwds)
+ elif context == "post-compute":
+ self.onPostCompute(*args, **kwds)
+ return
+
+
+ def onLoginNode(self, *args, **kwds):
+ # Redirect all output -- our journal output, and the output of
+ # our children -- to a file.
+ fd = os.open("output_build.txt", os.O_CREAT | os.O_WRONLY, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+ os.dup2(fd, 1)
+ os.dup2(fd, 2)
+ os.close(fd)
+
+ self.prepareFiles()
+ self.build()
+ pid = os.fork()
+ if pid:
+ self.monitorProgress(pid)
+ else:
+ self.schedule(*args, **kwds)
+ self.processOutputFiles()
+ return
+
+
+ def onLauncherNode(self, *args, **kwds):
+
+ launchCommand = self.launchCommand % {'nodes': self.nodes}
+ launchCommand = launchCommand.split()
+
+ wd = os.getcwd()
+ myArgv = self.getArgv(*args, **kwds)
+
+ # launch ourselves
+ argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ # launch the mesher
+ argv = launchCommand + [join(wd, "xmeshfem3D")]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ # launch the solver
+ argv = launchCommand + [join(wd, "xspecfem3D")]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ # launch ourselves
+ argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ return
+
+
+ def onPreCompute(self, *args, **kwds):
+ makedirs(self.scratchDir)
+
+ def onPostCompute(self, *args, **kwds):
+ try:
+ files = os.listdir(self.scratchDir)
+ for file in files:
+ try:
+ os.remove(join(self.scratchDir, file))
+ except OSError:
+ pass
+ except OSError:
+ pass
+
+ try:
+ os.rmdir(self.scratchDir)
+ except OSError:
+ pass
+
+ return
+
+
+ def prepareFiles(self):
+
+ self.mkdir("DATA")
+ self.mkdir("OUTPUT_FILES")
+
+ self.readAndCopyParFile()
+
+ shutil.copyfile(self.cmtSolution, "DATA/CMTSOLUTION")
+ shutil.copyfile(self.stations, "DATA/STATIONS")
+
+ dataSrc = join(SPECFEM3D_GLOBE, "DATA")
+ for name in os.listdir(dataSrc):
+ src = join(dataSrc, name)
+ dest = join("DATA", name)
+ if exists(dest):
+ continue
+ os.symlink(src, dest)
+
+ return
+
+
+ def readAndCopyParFile(self):
+ s = open(self.parFile, "r")
+ d = open("DATA/Par_file", "w")
+ for line in s:
+ tokens = line.split()
+ if len(tokens) >= 3:
+ var = tokens[0]
+ if var == "NCHUNKS":
+ NCHUNKS = int(tokens[2])
+ elif var == "NPROC_XI":
+ NPROC_XI = int(tokens[2])
+ elif var == "NPROC_ETA":
+ NPROC_ETA = int(tokens[2])
+ elif var == "LOCAL_PATH":
+ print >>d, "LOCAL_PATH =", self.scratchDir
+ continue
+ print >>d, line,
+ self.nodes = NCHUNKS * NPROC_XI * NPROC_ETA
+ return
+
+
+ def build(self):
+
+ # configure
+ configure = join(SPECFEM3D_GLOBE, "configure")
+ configureArgs = self.configureArgs.split()
+ argv = [configure] + configureArgs
+ print ' '.join(argv)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ # make
+ self.mkdir("obj")
+ argv = ['make']
+ print ' '.join(argv)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
+ return
+
+
+ def schedule(self, *args, **kwds):
+ argv = self.getArgv(*args, **kwds)
+
+ # initialize the job
+ job = self.job
+ job.nodes = self.nodes
+ job.executable = sys.executable
+ job.arguments = [__file__] + argv + ["--context=launcher", "--nodes=%d" % self.nodes]
+
+ # schedule
+ self.scheduler.schedule(job)
+
+ return
+
+
+ def mkdir(self, name):
+ if not isdir(name):
+ os.mkdir(name)
+ return
+
+
+ def processOutputFiles(self):
+
+ archiveOut = open("specfem3dglobe.tar.gz", 'w')
+ tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
+
+ for name in os.listdir("OUTPUT_FILES"):
+ pathname = join("OUTPUT_FILES", name)
+ arcname = pathname
+ tgzOut.add(pathname, arcname)
+
+ tgzOut.close()
+ archiveOut.close()
+
+ for filename in ["output_mesher.txt", "output_solver.txt"]:
+ pathname = join("OUTPUT_FILES", filename)
+ if exists(pathname):
+ shutil.copyfile(pathname, filename)
+
+ return
+
+
+ def monitorProgress(self, pid):
+ progress = None
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ newProgress = self.checkProgress()
+ if newProgress != progress:
+ progress = newProgress
+ self.postProgress(progress)
+ sleep(60)
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ return
+
+
+ def checkProgress(self):
+ try:
+ s = open("OUTPUT_FILES/output_solver.txt", 'r')
+ except IOError:
+ return None
+ progress = 0.0
+ for line in s:
+ if line.startswith(" We have done"):
+ progress = float(line.split()[3]) / 100.0
+ return progress
+
+
+ def postProgress(self, progress):
+ import urllib, urlparse
+ scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+ fields = { 'progress': "%.2f" % progress }
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ import httplib
+ if scheme == "http":
+ conn = httplib.HTTPConnection(host)
+ elif scheme == "https":
+ conn = httplib.HTTPSConnection(host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", path, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ return data
+
+
+
+def makedirs(name, mode=0777):
+ """Like os.makedirs(), but multi-{thread,process} safe."""
+ import os.path as path
+ from os import curdir, mkdir
+ head, tail = path.split(name)
+ if not tail:
+ head, tail = path.split(head)
+ if head and tail and not path.exists(head):
+ makedirs(head, mode)
+ if tail == curdir: # xxx/newdir/. exists if xxx/newdir exists
+ return
+ try:
+ mkdir(name, mode)
+ except OSError, error:
+ # 17 == "File exists"
+ if hasattr(error, 'errno') and error.errno == 17:
+ pass
+ else:
+ raise
+ return
+
+
+
+if __name__ == "__main__":
+ script = Specfem3DGlobe()
+ script.run()
+
+
+# end of file
Copied: cs/portal/trunk/northridge/backend/web-portal-daemon.cfg (from rev 12359, cs/portal/trunk/web-portal-daemon.cfg)
===================================================================
--- cs/portal/trunk/northridge/backend/web-portal-daemon.cfg (rev 0)
+++ cs/portal/trunk/northridge/backend/web-portal-daemon.cfg 2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,21 @@
+
+[web-portal-daemon]
+sleep-interval = 5*second
+output-root-pathname = /home/portal/public_html/output/
+output-root-url = http://crust.geodynamics.org/~portal/output/
+mineos-pathname = /home/portal/opt/mineos/bin/mineos.py
+
+remote = True
+specfem-pathname = /projects/tg/CIG/SPECFEM3D_GLOBE-4.0.1/specfem3D.py
+remote-shell-command = gsissh tg-login.tacc.teragrid.org
+remote-output-root = /work/teragrid/tgXXXXXX/output
+remote-download-command = gsiscp %(source)s tg-login.tacc.teragrid.org:%(dest)s
+remote-upload-command = gsiscp tg-login.tacc.teragrid.org:%(source)s %(dest)s
+
+journal.device = file
+
+[web-portal-daemon.portal]
+host = crust.geodynamics.org
+scheme = https
+
+
Deleted: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg 2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/web-portal-daemon.cfg 2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,12 +0,0 @@
-
-[web-portal-daemon]
-sleep-interval = 5*second
-download-input-files = True
-output-root-pathname = /home/leif/public_html/portal/
-output-root-url = http://crust.geodynamics.org/~leif/portal/
-
-[web-portal-daemon.portal]
-#host = crust.geodynamics.org
-host = localhost:8000
-
-
More information about the cig-commits
mailing list