[cig-commits] r11847 - seismo/3D/SPECFEM3D_GLOBE/trunk

leif at geodynamics.org leif at geodynamics.org
Tue Apr 22 13:25:08 PDT 2008


Author: leif
Date: 2008-04-22 13:25:07 -0700 (Tue, 22 Apr 2008)
New Revision: 11847

Modified:
   seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D
Log:
Archive output, report progress.

Modified: seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D
===================================================================
--- seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D	2008-04-22 19:03:19 UTC (rev 11846)
+++ seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D	2008-04-22 20:25:07 UTC (rev 11847)
@@ -3,7 +3,8 @@
 
 from pyre.applications import Script
 from os.path import dirname, exists, isdir, join
-import os, sys, stat, shutil
+import os, sys, stat, shutil, tarfile
+from time import sleep
 
 
 SPECFEM3D_GLOBE = dirname(__file__)
@@ -31,17 +32,21 @@
     job            = pyre.schedulers.job("job")
     launchCommand  = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
     context        = pyre.inventory.str(
-        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "compute"]))
+        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+    progressUrl = pyre.inventory.str("progress-url")
 
 
+
     def main(self, *args, **kwds):
         context = self.context
         if context == "login":
             self.onLoginNode(*args, **kwds)
         elif context == "launcher":
             self.onLauncherNode(*args, **kwds)
-        elif context == "compute":
-            self.onComputeNodes(*args, **kwds)
+        elif context == "pre-compute":
+            self.onPreCompute(*args, **kwds)
+        elif context == "post-compute":
+            self.onPostCompute(*args, **kwds)
         return
 
 
@@ -55,7 +60,12 @@
 
         self.prepareFiles()
         self.build()
-        self.schedule(*args, **kwds)
+        pid = os.fork()
+        if pid:
+            self.monitorProgress(pid)
+        else:
+            self.schedule(*args, **kwds)
+            self.processOutputFiles()
         return
 
 
@@ -65,7 +75,14 @@
         launchCommand = launchCommand.split()
 
         wd = os.getcwd()
+        myArgv = self.getArgv(*args, **kwds)
 
+        # launch ourselves
+        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
         # launch the mesher
         argv = launchCommand + [join(wd, "xmeshfem3D")]
         status = os.spawnvp(os.P_WAIT, argv[0], argv)
@@ -78,13 +95,37 @@
         if status != 0:
             sys.exit("%s: exit %d" % (argv[0], status))
 
+        # launch ourselves
+        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
         return
 
 
-    def onComputeNodes(self, *args, **kwds):
-        assert False, "not reached"
+    def onPreCompute(self, *args, **kwds):
+        makedirs(self.scratchDir)
 
+    def onPostCompute(self, *args, **kwds):
+        try:
+            files = os.listdir(self.scratchDir)
+            for file in files:
+                try:
+                    os.remove(join(self.scratchDir, file))
+                except OSError:
+                    pass
+        except OSError:
+            pass
 
+        try:
+            os.rmdir(self.scratchDir)
+        except OSError:
+            pass
+
+        return
+
+
     def prepareFiles(self):
         
         self.mkdir("DATA")
@@ -170,6 +211,97 @@
         return
 
 
+    def processOutputFiles(self):
+
+        archiveOut = open("specfem3dglobe.tar.gz", 'w')
+        tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
+
+        for name in os.listdir("OUTPUT_FILES"):
+            pathname = join("OUTPUT_FILES", name)
+            arcname = pathname
+            tgzOut.add(pathname, arcname)
+
+        tgzOut.close()
+        archiveOut.close()
+
+        for filename in ["output_mesher.txt", "output_solver.txt"]:
+            pathname = join("OUTPUT_FILES", filename)
+            if exists(pathname):
+                shutil.copyfile(pathname, filename)
+
+        return
+
+
+    def monitorProgress(self, pid):
+        progress = None
+        wpid, status = os.waitpid(pid, os.WNOHANG)
+        while wpid == 0:
+            newProgress = self.checkProgress()
+            if newProgress != progress:
+                progress = newProgress
+                self.postProgress(progress)
+            sleep(60)
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+        return
+
+
+    def checkProgress(self):
+        try:
+            s = open("OUTPUT_FILES/output_solver.txt", 'r')
+        except IOError:
+            return None
+        progress = 0.0
+        for line in s:
+            if line.startswith(" We have done"):
+                progress = float(line.split()[3]) / 100.0
+        return progress
+
+
+    def postProgress(self, progress):
+        import urllib, urlparse
+        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+        fields = { 'progress': "%.2f" % progress }
+        body = urllib.urlencode(fields)
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        import httplib
+        if scheme == "http":
+            conn = httplib.HTTPConnection(host)
+        elif scheme == "https":
+            conn = httplib.HTTPSConnection(host)
+        else:
+            assert False # not scheme in ["http", "https"]
+        conn.request("POST", path, body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        return data
+
+
+
+def makedirs(name, mode=0777):
+    """Like os.makedirs(), but multi-{thread,process} safe."""
+    import os.path as path
+    from os import curdir, mkdir
+    head, tail = path.split(name)
+    if not tail:
+        head, tail = path.split(head)
+    if head and tail and not path.exists(head):
+        makedirs(head, mode)
+        if tail == curdir:           # xxx/newdir/. exists if xxx/newdir exists
+            return
+    try:
+        mkdir(name, mode)
+    except OSError, error:
+        # 17 == "File exists"
+        if hasattr(error, 'errno') and error.errno == 17:
+            pass
+        else:
+            raise
+    return
+
+
+
 if __name__ == "__main__":
     script = Specfem3DGlobe()
     script.run()



More information about the cig-commits mailing list