[cig-commits] r12815 - cs/portal/trunk/northridge/backend
leif at geodynamics.org
leif at geodynamics.org
Thu Sep 4 19:59:25 PDT 2008
Author: leif
Date: 2008-09-04 19:59:25 -0700 (Thu, 04 Sep 2008)
New Revision: 12815
Modified:
cs/portal/trunk/northridge/backend/daemon.py
cs/portal/trunk/northridge/backend/specfem3D.py
Log:
SSH connections sometimes break, and login nodes sometimes reboot.
Reworked simulation monitoring to be robust in the face of these
calamities.
Modified: cs/portal/trunk/northridge/backend/daemon.py
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py 2008-09-05 02:10:32 UTC (rev 12814)
+++ cs/portal/trunk/northridge/backend/daemon.py 2008-09-05 02:59:25 UTC (rev 12815)
@@ -163,15 +163,9 @@
pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
os.chdir(savedWd)
self.info.log("spawned process %d" % pid)
+
+ status = self.waitForJobToFinish(job, pid)
- ticks = 2
- wpid, status = os.waitpid(pid, os.WNOHANG)
- while wpid == 0:
- for t in xrange(ticks):
- self.clock.tick.wait()
- ticks = min(ticks * 2, 10)
- wpid, status = os.waitpid(pid, os.WNOHANG)
-
self.uploadOutputFilesForJob(job)
if (os.WIFSIGNALED(status)):
@@ -197,15 +191,26 @@
return
- def argvForJob(self, job):
+ def argvForJob(self, job, extraArgs=[]):
return ([job.resSpec['executable']] + job.resSpec['arguments'] +
- ["--progress-url=%s" % job.progressUrl])
+ ["--progress-url=%s" % job.progressUrl] + extraArgs)
def uploadOutputFilesForJob(self, job):
return
+ def waitForJobToFinish(self, job, pid):
+ ticks = 2
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks = min(ticks * 2, 10)
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ return status
+
+
class RemoteShellJobManager(ForkJobManager):
def __init__(self, clock, root, info, config):
@@ -249,9 +254,9 @@
return
- def argvForJob(self, job):
+ def argvForJob(self, job, extraArgs=[]):
remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
- ' '.join(ForkJobManager.argvForJob(self, job)))
+ ' '.join(ForkJobManager.argvForJob(self, job, extraArgs)))
return self.rsh + [remoteCommand]
@@ -259,6 +264,27 @@
return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
+ def waitForJobToFinish(self, job, pid):
+ status = ForkJobManager.waitForJobToFinish(self, job, pid)
+ if not os.WIFEXITED(status):
+ return status
+ if os.WEXITSTATUS(status) != 0:
+ return status
+
+ # Periodically connect to the remote host and check to see
+ # whether the job has finished.
+ argv = self.argvForJob(job, extraArgs=["--context=monitor"])
+ ticks = 60 # 10 minutes
+ running = os.spawnvp(os.P_WAIT, argv[0], argv)
+ while running:
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ running = os.spawnvp(os.P_WAIT, argv[0], argv)
+
+ return status
+
+
+
class GlobusJobManager(JobManager):
def jobRunner(self, job):
@@ -735,15 +761,14 @@
class Clock(object):
- def __init__(self, interval):
- self.interval = interval
+ def __init__(self):
self.tick = Event()
stackless.tasklet(self)()
def __call__(self):
from time import sleep
while True:
- sleep(self.interval)
+ sleep(10)
self.tick.signal()
stackless.schedule()
return
@@ -778,7 +803,6 @@
import pyre.inventory as pyre
portal = pyre.facility("portal", factory=WebPortal)
- sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
outputRootPathname = pyre.str("output-root-pathname")
outputRootUrl = pyre.str("output-root-url")
@@ -798,7 +822,7 @@
def main(self, *args, **kwds):
self._info.activate()
self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
- clock = Clock(self.sleepInterval / second)
+ clock = Clock()
if self.remote:
gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
else:
Modified: cs/portal/trunk/northridge/backend/specfem3D.py
===================================================================
--- cs/portal/trunk/northridge/backend/specfem3D.py 2008-09-05 02:10:32 UTC (rev 12814)
+++ cs/portal/trunk/northridge/backend/specfem3D.py 2008-09-05 02:59:25 UTC (rev 12815)
@@ -8,6 +8,7 @@
SPECFEM3D_GLOBE = dirname(__file__)
+archive = "specfem3dglobe.tar.gz"
class Specfem3DGlobe(Script):
@@ -32,7 +33,7 @@
job = pyre.schedulers.job("job")
launchCommand = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
context = pyre.inventory.str(
- name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+ name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute", "monitor"]))
progressUrl = pyre.inventory.str("progress-url")
@@ -47,6 +48,8 @@
self.onPreCompute(*args, **kwds)
elif context == "post-compute":
self.onPostCompute(*args, **kwds)
+ elif context == "monitor":
+ self.onMonitor(*args, **kwds)
return
@@ -58,17 +61,33 @@
os.dup2(fd, 2)
os.close(fd)
+ assert not exists(archive)
+
self.prepareFiles()
self.build()
+
+ # This fork() allows the (gsi)ssh command to complete.
pid = os.fork()
- if pid:
- self.monitorProgress(pid)
- else:
- self.schedule(*args, **kwds)
- self.processOutputFiles()
+ if not pid:
+ # This pair of child processes merely updates the
+ # themometer on the web site. If the login node shuts
+ # down, the worst that will happen is that the themometer
+ # will freeze.
+ os.close(0) # original 1 & 2 closed by dup2 above
+ pid = os.fork()
+ if pid:
+ self.monitorProgress(pid)
+ else:
+ self.schedule(*args, **kwds)
return
+ def onMonitor(self, *args, **kwds):
+ if exists(archive):
+ sys.exit(0)
+ sys.exit(1)
+
+
def onLauncherNode(self, *args, **kwds):
launchCommand = self.launchCommand % {'nodes': self.nodes}
@@ -101,6 +120,8 @@
if status != 0:
sys.exit("%s: exit %d" % (argv[0], status))
+ self.processOutputFiles()
+
return
@@ -213,7 +234,7 @@
def processOutputFiles(self):
- archiveOut = open("specfem3dglobe.tar.gz", 'w')
+ archiveOut = open("_" + archive, 'w')
tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
for name in os.listdir("OUTPUT_FILES"):
@@ -229,6 +250,10 @@
if exists(pathname):
shutil.copyfile(pathname, filename)
+ # The existence of the latter signals that we are done. See
+ # onMonitor().
+ os.rename("_" + archive, archive)
+
return
More information about the cig-commits
mailing list