[cig-commits] r8320 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Wed Nov 21 19:08:27 PST 2007
Author: leif
Date: 2007-11-21 19:08:26 -0800 (Wed, 21 Nov 2007)
New Revision: 8320
Added:
cs/portal/trunk/mineos.py
Modified:
cs/portal/trunk/daemon.py
Log:
Extended the daemon so that it can potentially run Mineos concurrently
with SPECFEM (currently 'mineos.py' is just a stub).
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-11-22 00:16:11 UTC (rev 8319)
+++ cs/portal/trunk/daemon.py 2007-11-22 03:08:26 UTC (rev 8320)
@@ -1,6 +1,6 @@
import stackless
-import os, sys
+import os, sys, signal
from popen2 import Popen4
from pyre.applications import Script
@@ -48,50 +48,19 @@
return
-class GlobusJobManager(object):
+class JobManager(object):
- def __init__(self, clock, root, info, downloadInputFiles=False):
+ def __init__(self, clock, root, info):
self.clock = clock
self.root = root
self.info = info
- self.downloadInputFiles = downloadInputFiles
-
+
def runJob(self, job):
self.createSubdirectoryForJob(job)
stackless.tasklet(self.jobRunner)(job)
- def jobRunner(self, job):
-
- gassServer = GassServer.startUp(job.directory)
-
- try:
- resSpec = self.resSpec(job, gassServer)
- id = self.globusrun(resSpec)
-
- oldStatus = job.status
- ticks = 2
- while job.isAlive():
- for t in xrange(ticks):
- self.clock.tick.wait()
- ticks *= 2
- status = self.getJobStatus(id)
- if status != oldStatus:
- job.setStatus(status)
- oldStatus = status
- ticks = 2
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- job.setStatus(job.STATUS_FAILED)
-
- finally:
- gassServer.shutDown()
-
- return
-
-
def spawn(self, *argv):
command = ' '.join(argv)
self.info.log("spawning: %s" % command)
@@ -134,6 +103,118 @@
return output
+ def download(self, url, pathname):
+ import shutil
+ import urllib2
+ self.info.log("downloading %s" % url)
+ infile = urllib2.urlopen(url)
+ outfile = open(pathname, 'wb')
+ shutil.copyfileobj(infile, outfile)
+ outfile.close()
+ infile.close()
+ return
+
+ def downloadInputFilesForJob(self, job):
+ for inputFile in job.inputFiles:
+ url = job.urlForInputFile(inputFile)
+ filename = inputFile.split('/')[-1] # strips 'mineos/'
+ pathname = os.path.join(job.directory, filename)
+ self.download(url, pathname)
+ return
+
+ def createSubdirectoryForJob(self, job):
+ while True:
+ dirName = randomName()
+ dirPath = os.path.join(self.root, dirName)
+ if not os.path.exists(dirPath):
+ os.mkdir(dirPath)
+ break
+ job.subdir = dirName
+ job.directory = dirPath
+ return
+
+
+class ForkJobManager(JobManager):
+
+ def jobRunner(self, job):
+
+ self.downloadInputFilesForJob(job)
+
+ try:
+ argv = [job.resSpec['executable']] + job.resSpec['arguments']
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+ savedWd = os.getcwd()
+ os.chdir(job.directory)
+ pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
+ os.chdir(savedWd)
+ self.info.log("spawned process %d" % pid)
+
+ ticks = 2
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks *= 2
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ job.setStatus(job.STATUS_FAILED)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ if exitStatus == 0:
+ job.setStatus(job.STATUS_DONE)
+ else:
+ job.setStatus(job.STATUS_FAILED)
+ else:
+ statusStr = "status %d" % status
+ job.setStatus(job.STATUS_FAILED)
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+ self.info.log(statusMsg)
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ job.setStatus(job.STATUS_FAILED)
+
+ return
+
+
+
+class GlobusJobManager(JobManager):
+
+ def jobRunner(self, job):
+
+ self.downloadInputFilesForJob(job)
+ gassServer = GassServer.startUp(job.directory)
+
+ try:
+ resSpec = self.resSpec(job, gassServer)
+ id = self.globusrun(resSpec)
+
+ oldStatus = job.status
+ ticks = 2
+ while job.isAlive():
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks *= 2
+ status = self.getJobStatus(id)
+ if status != oldStatus:
+ job.setStatus(status)
+ oldStatus = status
+ ticks = 2
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ job.setStatus(job.STATUS_FAILED)
+
+ finally:
+ gassServer.shutDown()
+
+ return
+
+
def globusrun(self, resSpec):
import tempfile
fd, rslName = tempfile.mkstemp(suffix=".rsl")
@@ -194,11 +275,8 @@
resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
file_stage_in = []
- for url, inputFile in job.inputFileURLs:
- if self.downloadInputFiles:
- pathname = os.path.join(job.directory, inputFile)
- self.download(url, pathname)
- url = gassServer.url + "/./" + inputFile
+ for inputFile in job.inputFiles:
+ url = gassServer.url + "/./" + inputFile
file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
resSpec["file_stage_in"] = file_stage_in
@@ -217,30 +295,6 @@
return resSpec
- def download(self, url, pathname):
- import shutil
- import urllib2
- self.info.log("downloading %s" % url)
- infile = urllib2.urlopen(url)
- outfile = open(pathname, 'wb')
- shutil.copyfileobj(infile, outfile)
- outfile.close()
- infile.close()
- return
-
-
- def createSubdirectoryForJob(self, job):
- while True:
- dirName = randomName()
- dirPath = os.path.join(self.root, dirName)
- if not os.path.exists(dirPath):
- os.mkdir(dirPath)
- break
- job.subdir = dirName
- job.directory = dirPath
- return
-
-
class Event(object):
def __init__(self):
@@ -281,7 +335,8 @@
self.resSpec = resSpec
self.status = self.STATUS_NEW
self.statusChanged = Event()
- self.inputFileURLs = []
+ self.inputFiles = []
+ self.urlForInputFile = None
self.outputFiles = []
self.subdir = None
self.directory = None
@@ -310,42 +365,51 @@
STATUS_ERROR = "error"
deadCodes = [STATUS_DONE, STATUS_ERROR]
- def __init__(self, id, simulation, info):
+ def __init__(self, id, simulation, urlForInputFile, config, info):
self.id = id
self.simulation = simulation
+ self.urlForInputFile = urlForInputFile
+ self.mineosPathname = config.mineosPathname
+ self.dry = config.dry
self.info = info
- self.job = None
+ self.jobChannel = stackless.channel()
self.status = self.STATUS_NEW
self.statusChanged = Event()
- self.inputFileURLs = []
- self.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
+ def go(self, gjm, fjm):
+ stackless.tasklet(self)(gjm, fjm)
- def go(self, jm):
- stackless.tasklet(self)(jm)
-
- def __call__(self, jm):
+ def __call__(self, gjm, fjm):
try:
self.setStatus(self.STATUS_CONNECTING)
# run
- job = self.newJob()
- self.job = job
- jm.runJob(job)
+ mineos = self.newMineosJob()
+ specfem = self.newSpecfemJob()
+ fjm.runJob(mineos)
+ gjm.runJob(specfem)
- # also reported by job; but we need it here to tickle job watcher
- self.setStatus(self.STATUS_PREPARING)
-
- while job.isAlive():
- job.statusChanged.wait()
+ # send jobs to jobSink()
+ self.jobChannel.send(mineos)
+ self.jobChannel.send(specfem)
+ self.jobChannel.send(None) # no more jobs
+
+ while specfem.isAlive():
+ specfem.statusChanged.wait()
# while
- if job.status == job.STATUS_FAILED:
+ if specfem.status == specfem.STATUS_FAILED:
self.setStatus(self.STATUS_ERROR)
raise RuntimeError("run failed")
- self.job = None
+ # The Mineos job should have finished long ago.
+ while mineos.isAlive():
+ mineos.statusChanged.wait()
+ if mineos.status == mineos.STATUS_FAILED:
+ self.setStatus(self.STATUS_ERROR)
+ raise RuntimeError("Mineos run failed")
+
self.setStatus(self.STATUS_DONE)
except Exception, e:
@@ -361,7 +425,10 @@
def isAlive(self):
return not self.status in self.deadCodes
- def newJob(self):
+ def newSpecfemJob(self):
+ dry = []
+ if self.dry:
+ dry = ["--scheduler.dry"]
job = Job(
"run",
jobType = "single",
@@ -377,13 +444,26 @@
"--output-dir=.", # Globus scratch dir
"--job.stdout=stdout.txt",
"--job.stderr=stderr.txt",
- ],
+ ] + dry,
)
- job.inputFileURLs = self.inputFileURLs
- job.outputFiles = self.outputFiles
+ job.urlForInputFile = self.urlForInputFile
+ sim = self.simulation
+ job.inputFiles = [sim.parameters, sim.events, sim.stations]
+ job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
return job
+ def newMineosJob(self):
+ job = Job(
+ "run",
+ executable = self.mineosPathname,
+ arguments = [],
+ )
+ job.urlForInputFile = self.urlForInputFile
+ job.inputFiles = ["mineos/event.txt", "mineos/site.txt", "mineos/sitechan.txt"]
+ job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
+ return job
+
class Simulation(object):
def __init__(self, id, nodes):
self.id = id
@@ -391,7 +471,6 @@
self.parameters = 'parameters.pml'
self.events = 'events.txt'
self.stations = 'stations.txt'
- self.inputFiles = [self.parameters, self.events, self.stations]
class PortalConnection(object):
@@ -404,10 +483,10 @@
self.clock = clock
self.info = info
- def runSimulations(self, jm):
- stackless.tasklet(self.runFactory)(jm)
+ def runSimulations(self, gjm, fjm, config):
+ stackless.tasklet(self.runFactory)(gjm, fjm, config)
- def runFactory(self, jm):
+ def runFactory(self, gjm, fjm, config):
import urllib2
runs = {}
@@ -437,32 +516,39 @@
not runs.has_key(id)):
self.info.log("new run %d" % id)
simulation = Simulation(simId, nodes)
- newRun = Run(id, simulation, self.info)
- for inputFile in simulation.inputFiles:
- url = self.inputFileURL(simulation, inputFile)
- newRun.inputFileURLs.append((url, inputFile))
+
+ def urlForInputFile(inputFile):
+ # Map input filenames to URLs in the context
+ # of this run.
+ return self.inputFileURL(simulation, inputFile)
+
+ newRun = Run(id, simulation, urlForInputFile, config, self.info)
+
self.watchRun(newRun)
runs[id] = newRun
- newRun.go(jm)
+ newRun.go(gjm, fjm)
return
def watchRun(self, run):
stackless.tasklet(self.runWatcher)(run)
+ stackless.tasklet(self.jobSink)(run)
def runWatcher(self, run):
url = self.portal.runStatusUrl % run.id
- job = run.job
while run.isAlive():
run.statusChanged.wait()
fields = {'status': run.status}
self.postStatusChange(url, fields)
- if run.job != job:
- job = run.job
- if job is not None:
- self.watchJob(job, run)
return
+ def jobSink(self, run):
+ newJob = run.jobChannel.receive()
+ while newJob is not None:
+ self.watchJob(newJob, run)
+ newJob = run.jobChannel.receive()
+ return
+
def watchJob(self, job, run):
url = self.portal.jobCreateUrl
fields = self.jobFields(job, run)
@@ -491,7 +577,7 @@
# in case the daemon and the web server are run as
# different users
import stat
- os.chmod(pathname, stat.S_IRGRP | stat.S_IROTH)
+ os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
fields = {
'job': portalId,
@@ -614,25 +700,22 @@
portal = pyre.facility("portal", factory=WebPortal)
sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
- downloadInputFiles = pyre.bool("download-input-files", default=False)
- downloadInputFiles.meta['tip'] = (
-"""Download input files from the portal and serve them from the GASS
-server. This is required when debugging a portal which is running
-locally (at 'localhost'). The default is 'False' because Globus can
-stage-in input files directly from a publicly-accessible portal via
-HTTP.""")
-
outputRootPathname = pyre.str("output-root-pathname")
outputRootUrl = pyre.str("output-root-url")
+ mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
+ dry = pyre.bool("dry", default=False)
+
+
def main(self, *args, **kwds):
self._info.activate()
self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
clock = Clock(self.sleepInterval / second)
- jm = GlobusJobManager(clock, self.outputRootPathname, self._info, self.downloadInputFiles)
+ gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+ fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
- connection.runSimulations(jm)
+ connection.runSimulations(gjm, fjm, self)
try:
stackless.run()
except KeyboardInterrupt:
Added: cs/portal/trunk/mineos.py
===================================================================
--- cs/portal/trunk/mineos.py 2007-11-22 00:16:11 UTC (rev 8319)
+++ cs/portal/trunk/mineos.py 2007-11-22 03:08:26 UTC (rev 8320)
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+f = open("output_mineos.txt", "w")
+print >>f, "Hello from Mineos!"
+f.close()
Property changes on: cs/portal/trunk/mineos.py
___________________________________________________________________
Name: svn:executable
+ *
More information about the cig-commits
mailing list