[cig-commits] r11847 - seismo/3D/SPECFEM3D_GLOBE/trunk
leif at geodynamics.org
leif at geodynamics.org
Tue Apr 22 13:25:08 PDT 2008
Author: leif
Date: 2008-04-22 13:25:07 -0700 (Tue, 22 Apr 2008)
New Revision: 11847
Modified:
seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D
Log:
Archive output, report progress.
Modified: seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D
===================================================================
--- seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D 2008-04-22 19:03:19 UTC (rev 11846)
+++ seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D 2008-04-22 20:25:07 UTC (rev 11847)
@@ -3,7 +3,8 @@
from pyre.applications import Script
from os.path import dirname, exists, isdir, join
-import os, sys, stat, shutil
+import os, sys, stat, shutil, tarfile
+from time import sleep
SPECFEM3D_GLOBE = dirname(__file__)
@@ -31,17 +32,21 @@
job = pyre.schedulers.job("job")
launchCommand = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
context = pyre.inventory.str(
- name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "compute"]))
+ name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+ progressUrl = pyre.inventory.str("progress-url")
+
def main(self, *args, **kwds):
context = self.context
if context == "login":
self.onLoginNode(*args, **kwds)
elif context == "launcher":
self.onLauncherNode(*args, **kwds)
- elif context == "compute":
- self.onComputeNodes(*args, **kwds)
+ elif context == "pre-compute":
+ self.onPreCompute(*args, **kwds)
+ elif context == "post-compute":
+ self.onPostCompute(*args, **kwds)
return
@@ -55,7 +60,12 @@
self.prepareFiles()
self.build()
- self.schedule(*args, **kwds)
+ pid = os.fork()
+ if pid:
+ self.monitorProgress(pid)
+ else:
+ self.schedule(*args, **kwds)
+ self.processOutputFiles()
return
@@ -65,7 +75,14 @@
launchCommand = launchCommand.split()
wd = os.getcwd()
+ myArgv = self.getArgv(*args, **kwds)
+ # launch ourselves
+ argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
# launch the mesher
argv = launchCommand + [join(wd, "xmeshfem3D")]
status = os.spawnvp(os.P_WAIT, argv[0], argv)
@@ -78,13 +95,37 @@
if status != 0:
sys.exit("%s: exit %d" % (argv[0], status))
+ # launch ourselves
+ argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ if status != 0:
+ sys.exit("%s: exit %d" % (argv[0], status))
+
return
- def onComputeNodes(self, *args, **kwds):
- assert False, "not reached"
+ def onPreCompute(self, *args, **kwds):
+ makedirs(self.scratchDir)
+ def onPostCompute(self, *args, **kwds):
+ try:
+ files = os.listdir(self.scratchDir)
+ for file in files:
+ try:
+ os.remove(join(self.scratchDir, file))
+ except OSError:
+ pass
+ except OSError:
+ pass
+ try:
+ os.rmdir(self.scratchDir)
+ except OSError:
+ pass
+
+ return
+
+
def prepareFiles(self):
self.mkdir("DATA")
@@ -170,6 +211,97 @@
return
+ def processOutputFiles(self):
+
+ archiveOut = open("specfem3dglobe.tar.gz", 'w')
+ tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
+
+ for name in os.listdir("OUTPUT_FILES"):
+ pathname = join("OUTPUT_FILES", name)
+ arcname = pathname
+ tgzOut.add(pathname, arcname)
+
+ tgzOut.close()
+ archiveOut.close()
+
+ for filename in ["output_mesher.txt", "output_solver.txt"]:
+ pathname = join("OUTPUT_FILES", filename)
+ if exists(pathname):
+ shutil.copyfile(pathname, filename)
+
+ return
+
+
+ def monitorProgress(self, pid):
+ progress = None
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ while wpid == 0:
+ newProgress = self.checkProgress()
+ if newProgress != progress:
+ progress = newProgress
+ self.postProgress(progress)
+ sleep(60)
+ wpid, status = os.waitpid(pid, os.WNOHANG)
+ return
+
+
+ def checkProgress(self):
+ try:
+ s = open("OUTPUT_FILES/output_solver.txt", 'r')
+ except IOError:
+ return None
+ progress = 0.0
+ for line in s:
+ if line.startswith(" We have done"):
+ progress = float(line.split()[3]) / 100.0
+ return progress
+
+
+ def postProgress(self, progress):
+ import urllib, urlparse
+ scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+ fields = { 'progress': "%.2f" % progress }
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ import httplib
+ if scheme == "http":
+ conn = httplib.HTTPConnection(host)
+ elif scheme == "https":
+ conn = httplib.HTTPSConnection(host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", path, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ return data
+
+
+
+def makedirs(name, mode=0777):
+ """Like os.makedirs(), but multi-{thread,process} safe."""
+ import os.path as path
+ from os import curdir, mkdir
+ head, tail = path.split(name)
+ if not tail:
+ head, tail = path.split(head)
+ if head and tail and not path.exists(head):
+ makedirs(head, mode)
+ if tail == curdir: # xxx/newdir/. exists if xxx/newdir exists
+ return
+ try:
+ mkdir(name, mode)
+ except OSError, error:
+ # 17 == "File exists"
+ if hasattr(error, 'errno') and error.errno == 17:
+ pass
+ else:
+ raise
+ return
+
+
+
if __name__ == "__main__":
script = Specfem3DGlobe()
script.run()
More information about the cig-commits
mailing list