[cig-commits] r14243 - cs/portal/trunk/northridge/backend
leif at geodynamics.org
leif at geodynamics.org
Fri Mar 6 13:12:23 PST 2009
Author: leif
Date: 2009-03-06 13:12:23 -0800 (Fri, 06 Mar 2009)
New Revision: 14243
Modified:
cs/portal/trunk/northridge/backend/daemon.py
cs/portal/trunk/northridge/backend/mineos.py
Log:
Split Mineos runs into two Jobs (class SeismoWebPortal.models.Job).
This reflects what is actually happening under the hood in the new
parallel runs. A 1-4 processor LSF job is run for minos_bran/eigcon;
then, a second N=100 processor job is scheduled for green/syndat.
Independent Jobs in the database will allow each stage to have its own
progress bar.
Modified: cs/portal/trunk/northridge/backend/daemon.py
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py 2009-03-06 20:41:35 UTC (rev 14242)
+++ cs/portal/trunk/northridge/backend/daemon.py 2009-03-06 21:12:23 UTC (rev 14243)
@@ -273,7 +273,7 @@
# Periodically connect to the remote host and check to see
# whether the job has finished.
- argv = self.argvForJob(job, extraArgs=["--context=monitor"])
+ argv = self.argvForJob(job, extraArgs=job.monitorArgs)
ticks = 60 # 10 minutes
running = os.spawnvp(os.P_WAIT, argv[0], argv)
while running:
@@ -444,6 +444,7 @@
self.outputFiles = []
self.subdir = None
self.directory = None
+ self.monitorArgs = None
def setStatus(self, status):
if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
@@ -488,27 +489,27 @@
def __call__(self):
try:
self.setStatus(self.STATUS_CONNECTING)
-
- # run
- job = self.newJob()
- self.jm.runJob(job)
- # send jobs to jobSink()
- self.jobChannel.send(job)
- self.jobChannel.send(None) # no more jobs
+ for job in self.iterJobs():
+ # run
+ self.jm.runJob(job)
- self.setStatus(self.STATUS_PREPARING)
+ # send jobs to jobSink()
+ self.jobChannel.send(job)
+ self.jobChannel.send(None) # no more jobs
- while job.isAlive():
- job.statusChanged.wait()
- # while
+ self.setStatus(self.STATUS_PREPARING)
- if job.status == job.STATUS_FAILED:
- self.setStatus(self.STATUS_ERROR)
- raise RuntimeError("run failed")
+ while job.isAlive():
+ job.statusChanged.wait()
+ # while
+ if job.status == job.STATUS_FAILED:
+ self.setStatus(self.STATUS_ERROR)
+ raise RuntimeError("run failed")
+
self.setStatus(self.STATUS_DONE)
-
+
except Exception, e:
self.info.log("error: %s: %s" % (e.__class__.__name__, e))
self.setStatus(self.STATUS_ERROR)
@@ -525,7 +526,7 @@
class SpecfemRun(Run):
- def newJob(self):
+ def iterJobs(self):
dry = []
if self.dry:
dry = ["--scheduler.dry"]
@@ -554,24 +555,39 @@
job.urlForInputFile = self.urlForInputFile
job.inputFiles = [parameters, event, stations, model]
job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
- return job
+ job.monitorArgs = ["--context=monitor"]
+ yield job
+ return
class MineosRun(Run):
- def newJob(self):
+ def iterJobs(self):
job = Job(
self,
- "run",
+ "eigen",
executable = self.mineosPathname,
- arguments = ["parameters.pml"],
+ arguments = ["parameters.pml", "--context=loginEigen"],
)
job.urlForInputFile = self.urlForInputFile
job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
+ job.monitorArgs = ["--context=monitorEigen"]
+ yield job
+
+ job = Job(
+ self,
+ "seismograms",
+ executable = self.mineosPathname,
+ arguments = ["parameters.pml", "--context=loginSeismograms"],
+ )
+ job.urlForInputFile = self.urlForInputFile
job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
- return job
+ job.monitorArgs = ["--context=monitorSeismograms"]
+ yield job
+ return
+
class PortalConnection(object):
MULTIPART_BOUNDARY = '----------eArThQuAkE$'
Modified: cs/portal/trunk/northridge/backend/mineos.py
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py 2009-03-06 20:41:35 UTC (rev 14242)
+++ cs/portal/trunk/northridge/backend/mineos.py 2009-03-06 21:12:23 UTC (rev 14243)
@@ -17,11 +17,12 @@
prefix = dirname(dirname(__file__))
-mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
archiveDirName = "mineos"
archive = archiveDirName + ".tar.gz"
+eigenDone = "eigenDone"
class Mode(object):
@@ -268,16 +269,16 @@
launchCommand = pyre.str("launch-command")
context = pyre.str("context", default="login", validator=pyre.choice(["login",
- "launchEigen", "computeEigen",
- "launchSeismograms", "computeSeismograms",
- "monitor"]))
+ "loginEigen", "launchEigen", "computeEigen",
+ "loginSeismograms", "launchSeismograms", "computeSeismograms",
+ "monitorEigen", "monitorSeismograms"]))
progressUrl = pyre.str("progress-url")
#------------------------------------------------------------------------
# running
- def runMineosPrograms(self, *args, **kwds):
+ def loginEigen(self, *args, **kwds):
enabledModes = self.enabledModes()
self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
@@ -302,7 +303,22 @@
job.nodes = len(modes)
job.arguments.extend(["--context=launchEigen", "--nodes=%d" % job.nodes])
self.scheduleJob(job)
+
+ # The existence this file signals that we are done. See
+ # onMonitor().
+ self.spawn("touch", eigenDone)
+
+ return
+
+
+ def loginSeismograms(self, *args, **kwds):
+ enabledModes = self.enabledModes()
+ self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
+ if not enabledModes:
+ self._info.log("nothing to do")
+ return
+
jobDir = os.getcwd()
os.chdir(archiveDirName)
@@ -487,13 +503,13 @@
def main(self, *args, **kwds):
self._info.activate()
context = self.context
- if context == "login":
+ if context.startswith("login"):
self.onLoginNode(*args, **kwds)
elif context.startswith("launch"):
self.onLauncherNode(*args, **kwds)
elif context.startswith("compute"):
self.onCompute(*args, **kwds)
- elif context == "monitor":
+ elif context.startswith("monitor"):
self.onMonitor(*args, **kwds)
return
@@ -509,12 +525,21 @@
if pid:
self.monitorProgress(pid)
else:
- self.runMineosPrograms(*args, **kwds)
+ context = self.context
+ if context == "login":
+ self.loginEigen(*args, **kwds)
+ self.loginSeismograms(*args, **kwds)
+ elif context == "loginEigen":
+ self.loginEigen(*args, **kwds)
+ elif context == "loginSeismograms":
+ self.loginSeismograms(*args, **kwds)
return
def onMonitor(self, *args, **kwds):
- if exists(archive):
+ filename = {"monitorEigen": eigenDone,
+ "monitorSeismograms": archive}[self.context]
+ if exists(filename):
sys.exit(0)
sys.exit(1)
More information about the CIG-COMMITS
mailing list