[cig-commits] r12815 - cs/portal/trunk/northridge/backend

leif at geodynamics.org leif at geodynamics.org
Thu Sep 4 19:59:25 PDT 2008


Author: leif
Date: 2008-09-04 19:59:25 -0700 (Thu, 04 Sep 2008)
New Revision: 12815

Modified:
   cs/portal/trunk/northridge/backend/daemon.py
   cs/portal/trunk/northridge/backend/specfem3D.py
Log:
SSH connections sometimes break, and login nodes sometimes reboot.
Reworked simulation monitoring to be robust in the face of these
calamities.


Modified: cs/portal/trunk/northridge/backend/daemon.py
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py	2008-09-05 02:10:32 UTC (rev 12814)
+++ cs/portal/trunk/northridge/backend/daemon.py	2008-09-05 02:59:25 UTC (rev 12815)
@@ -163,15 +163,9 @@
             pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
             os.chdir(savedWd)
             self.info.log("spawned process %d" % pid)
+
+            status = self.waitForJobToFinish(job, pid)
             
-            ticks = 2
-            wpid, status = os.waitpid(pid, os.WNOHANG)
-            while wpid == 0:
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks = min(ticks * 2, 10)
-                wpid, status = os.waitpid(pid, os.WNOHANG)
-
             self.uploadOutputFilesForJob(job)
             
             if (os.WIFSIGNALED(status)):
@@ -197,15 +191,26 @@
         return
 
 
-    def argvForJob(self, job):
+    def argvForJob(self, job, extraArgs=[]):
         return ([job.resSpec['executable']] + job.resSpec['arguments'] +
-                ["--progress-url=%s" % job.progressUrl])
+                ["--progress-url=%s" % job.progressUrl] + extraArgs)
 
 
     def uploadOutputFilesForJob(self, job):
         return
 
 
+    def waitForJobToFinish(self, job, pid):
+        ticks = 2
+        wpid, status = os.waitpid(pid, os.WNOHANG)
+        while wpid == 0:
+            for t in xrange(ticks):
+                self.clock.tick.wait()
+            ticks = min(ticks * 2, 10)
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+        return status
+
+
 class RemoteShellJobManager(ForkJobManager):
 
     def __init__(self, clock, root, info, config):
@@ -249,9 +254,9 @@
         return
 
 
-    def argvForJob(self, job):
+    def argvForJob(self, job, extraArgs=[]):
         remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
-                         ' '.join(ForkJobManager.argvForJob(self, job)))
+                         ' '.join(ForkJobManager.argvForJob(self, job, extraArgs)))
         return self.rsh + [remoteCommand]
 
 
@@ -259,6 +264,27 @@
         return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
 
 
+    def waitForJobToFinish(self, job, pid):
+        status = ForkJobManager.waitForJobToFinish(self, job, pid)
+        if not os.WIFEXITED(status):
+            return status
+        if os.WEXITSTATUS(status) != 0:
+            return status
+
+        # Periodically connect to the remote host and check to see
+        # whether the job has finished.
+        argv = self.argvForJob(job, extraArgs=["--context=monitor"])
+        ticks = 60 # 10 minutes
+        running = os.spawnvp(os.P_WAIT, argv[0], argv)
+        while running:
+            for t in xrange(ticks):
+                self.clock.tick.wait()
+            running = os.spawnvp(os.P_WAIT, argv[0], argv)
+        
+        return status
+
+
+
 class GlobusJobManager(JobManager):
 
     def jobRunner(self, job):
@@ -735,15 +761,14 @@
 
 class Clock(object):
 
-    def __init__(self, interval):
-        self.interval = interval
+    def __init__(self):
         self.tick = Event()
         stackless.tasklet(self)()
 
     def __call__(self):
         from time import sleep
         while True:
-            sleep(self.interval)
+            sleep(10)
             self.tick.signal()
             stackless.schedule()
         return
@@ -778,7 +803,6 @@
 
     import pyre.inventory as pyre
     portal = pyre.facility("portal", factory=WebPortal)
-    sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
 
     outputRootPathname = pyre.str("output-root-pathname")
     outputRootUrl = pyre.str("output-root-url")
@@ -798,7 +822,7 @@
     def main(self, *args, **kwds):
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
-        clock = Clock(self.sleepInterval / second)
+        clock = Clock()
         if self.remote:
             gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
         else:

Modified: cs/portal/trunk/northridge/backend/specfem3D.py
===================================================================
--- cs/portal/trunk/northridge/backend/specfem3D.py	2008-09-05 02:10:32 UTC (rev 12814)
+++ cs/portal/trunk/northridge/backend/specfem3D.py	2008-09-05 02:59:25 UTC (rev 12815)
@@ -8,6 +8,7 @@
 
 
 SPECFEM3D_GLOBE = dirname(__file__)
+archive = "specfem3dglobe.tar.gz"
 
 
 class Specfem3DGlobe(Script):
@@ -32,7 +33,7 @@
     job            = pyre.schedulers.job("job")
     launchCommand  = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
     context        = pyre.inventory.str(
-        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute", "monitor"]))
     progressUrl = pyre.inventory.str("progress-url")
 
 
@@ -47,6 +48,8 @@
             self.onPreCompute(*args, **kwds)
         elif context == "post-compute":
             self.onPostCompute(*args, **kwds)
+        elif context == "monitor":
+            self.onMonitor(*args, **kwds)
         return
 
 
@@ -58,17 +61,33 @@
         os.dup2(fd, 2)
         os.close(fd)
 
+        assert not exists(archive)
+
         self.prepareFiles()
         self.build()
+
+        # This fork() allows the (gsi)ssh command to complete.
         pid = os.fork()
-        if pid:
-            self.monitorProgress(pid)
-        else:
-            self.schedule(*args, **kwds)
-            self.processOutputFiles()
+        if not pid:
+            # This pair of child processes merely updates the
+            # themometer on the web site.  If the login node shuts
+            # down, the worst that will happen is that the themometer
+            # will freeze.
+            os.close(0) # original 1 & 2 closed by dup2 above
+            pid = os.fork()
+            if pid:
+                self.monitorProgress(pid)
+            else:
+                self.schedule(*args, **kwds)
         return
 
 
+    def onMonitor(self, *args, **kwds):
+        if exists(archive):
+            sys.exit(0)
+        sys.exit(1)
+
+
     def onLauncherNode(self, *args, **kwds):
         
         launchCommand = self.launchCommand % {'nodes': self.nodes}
@@ -101,6 +120,8 @@
         if status != 0:
             sys.exit("%s: exit %d" % (argv[0], status))
 
+        self.processOutputFiles()
+
         return
 
 
@@ -213,7 +234,7 @@
 
     def processOutputFiles(self):
 
-        archiveOut = open("specfem3dglobe.tar.gz", 'w')
+        archiveOut = open("_" + archive, 'w')
         tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
 
         for name in os.listdir("OUTPUT_FILES"):
@@ -229,6 +250,10 @@
             if exists(pathname):
                 shutil.copyfile(pathname, filename)
 
+        # The existence of the latter signals that we are done.  See
+        # onMonitor().
+        os.rename("_" + archive, archive)
+
         return
 
 



More information about the cig-commits mailing list