[cig-commits] r8320 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Wed Nov 21 19:08:27 PST 2007


Author: leif
Date: 2007-11-21 19:08:26 -0800 (Wed, 21 Nov 2007)
New Revision: 8320

Added:
   cs/portal/trunk/mineos.py
Modified:
   cs/portal/trunk/daemon.py
Log:
Extended the daemon so that it can potentially run Mineos concurrently
with SPECFEM (currently 'mineos.py' is just a stub).


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-11-22 00:16:11 UTC (rev 8319)
+++ cs/portal/trunk/daemon.py	2007-11-22 03:08:26 UTC (rev 8320)
@@ -1,6 +1,6 @@
 
 import stackless
-import os, sys
+import os, sys, signal
 from popen2 import Popen4
 
 from pyre.applications import Script
@@ -48,50 +48,19 @@
         return
 
 
-class GlobusJobManager(object):
+class JobManager(object):
 
-    def __init__(self, clock, root, info, downloadInputFiles=False):
+    def __init__(self, clock, root, info):
         self.clock = clock
         self.root = root
         self.info = info
-        self.downloadInputFiles = downloadInputFiles
 
-    
+
     def runJob(self, job):
         self.createSubdirectoryForJob(job)
         stackless.tasklet(self.jobRunner)(job)
 
     
-    def jobRunner(self, job):
-
-        gassServer = GassServer.startUp(job.directory)
-        
-        try:
-            resSpec = self.resSpec(job, gassServer)
-            id = self.globusrun(resSpec)
-            
-            oldStatus = job.status
-            ticks = 2
-            while job.isAlive():
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks *= 2
-                status = self.getJobStatus(id)
-                if status != oldStatus:
-                    job.setStatus(status)
-                    oldStatus = status
-                    ticks = 2
-        
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_FAILED)
-
-        finally:
-            gassServer.shutDown()
-        
-        return
-
-
     def spawn(self, *argv):
         command = ' '.join(argv)
         self.info.log("spawning: %s" % command)
@@ -134,6 +103,118 @@
         return output
 
 
+    def download(self, url, pathname):
+        import shutil
+        import urllib2
+        self.info.log("downloading %s" % url)
+        infile = urllib2.urlopen(url)
+        outfile = open(pathname, 'wb')
+        shutil.copyfileobj(infile,  outfile)
+        outfile.close()
+        infile.close()
+        return
+
+    def downloadInputFilesForJob(self, job):
+        for inputFile in job.inputFiles:
+            url = job.urlForInputFile(inputFile)
+            filename = inputFile.split('/')[-1] # strips 'mineos/'
+            pathname = os.path.join(job.directory, filename)
+            self.download(url, pathname)
+        return
+
+    def createSubdirectoryForJob(self, job):
+        while True:
+            dirName = randomName()
+            dirPath = os.path.join(self.root, dirName)
+            if not os.path.exists(dirPath):
+                os.mkdir(dirPath)
+                break
+        job.subdir = dirName
+        job.directory = dirPath
+        return
+
+
+class ForkJobManager(JobManager):
+
+    def jobRunner(self, job):
+
+        self.downloadInputFilesForJob(job)
+
+        try:
+            argv = [job.resSpec['executable']] + job.resSpec['arguments']
+            command = ' '.join(argv)
+            self.info.log("spawning: %s" % command)
+            savedWd = os.getcwd()
+            os.chdir(job.directory)
+            pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
+            os.chdir(savedWd)
+            self.info.log("spawned process %d" % pid)
+            
+            ticks = 2
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+            while wpid == 0:
+                for t in xrange(ticks):
+                    self.clock.tick.wait()
+                ticks *= 2
+                wpid, status = os.waitpid(pid, os.WNOHANG)
+
+            if (os.WIFSIGNALED(status)):
+                statusStr = "signal %d" % os.WTERMSIG(status)
+                job.setStatus(job.STATUS_FAILED)
+            elif (os.WIFEXITED(status)):
+                exitStatus = os.WEXITSTATUS(status)
+                statusStr = "exit %d" % exitStatus
+                if exitStatus == 0:
+                    job.setStatus(job.STATUS_DONE)
+                else:
+                    job.setStatus(job.STATUS_FAILED)
+            else:
+                statusStr = "status %d" % status
+                job.setStatus(job.STATUS_FAILED)
+            statusMsg = "%s: %s" % (argv[0], statusStr)
+            self.info.log(statusMsg)
+        
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            job.setStatus(job.STATUS_FAILED)
+
+        return
+
+
+
+class GlobusJobManager(JobManager):
+
+    def jobRunner(self, job):
+
+        self.downloadInputFilesForJob(job)
+        gassServer = GassServer.startUp(job.directory)
+        
+        try:
+            resSpec = self.resSpec(job, gassServer)
+            id = self.globusrun(resSpec)
+
+            oldStatus = job.status
+            ticks = 2
+            while job.isAlive():
+                for t in xrange(ticks):
+                    self.clock.tick.wait()
+                ticks *= 2
+                status = self.getJobStatus(id)
+                if status != oldStatus:
+                    job.setStatus(status)
+                    oldStatus = status
+                    ticks = 2
+        
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            job.setStatus(job.STATUS_FAILED)
+
+        finally:
+            gassServer.shutDown()
+        
+        return
+
+
     def globusrun(self, resSpec):
         import tempfile
         fd, rslName = tempfile.mkstemp(suffix=".rsl")
@@ -194,11 +275,8 @@
         resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
         
         file_stage_in = []
-        for url, inputFile in job.inputFileURLs:
-            if self.downloadInputFiles:
-                pathname = os.path.join(job.directory, inputFile)
-                self.download(url, pathname)
-                url = gassServer.url + "/./" + inputFile
+        for inputFile in job.inputFiles:
+            url = gassServer.url + "/./" + inputFile
             file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
         resSpec["file_stage_in"] = file_stage_in
         
@@ -217,30 +295,6 @@
         return resSpec
 
 
-    def download(self, url, pathname):
-        import shutil
-        import urllib2
-        self.info.log("downloading %s" % url)
-        infile = urllib2.urlopen(url)
-        outfile = open(pathname, 'wb')
-        shutil.copyfileobj(infile,  outfile)
-        outfile.close()
-        infile.close()
-        return
-
-
-    def createSubdirectoryForJob(self, job):
-        while True:
-            dirName = randomName()
-            dirPath = os.path.join(self.root, dirName)
-            if not os.path.exists(dirPath):
-                os.mkdir(dirPath)
-                break
-        job.subdir = dirName
-        job.directory = dirPath
-        return
-
-
 class Event(object):
     
     def __init__(self):
@@ -281,7 +335,8 @@
         self.resSpec = resSpec
         self.status = self.STATUS_NEW
         self.statusChanged = Event()
-        self.inputFileURLs = []
+        self.inputFiles = []
+        self.urlForInputFile = None
         self.outputFiles = []
         self.subdir = None
         self.directory = None
@@ -310,42 +365,51 @@
     STATUS_ERROR      = "error"
     deadCodes = [STATUS_DONE, STATUS_ERROR]
 
-    def __init__(self, id, simulation, info):
+    def __init__(self, id, simulation, urlForInputFile, config, info):
         self.id = id
         self.simulation = simulation
+        self.urlForInputFile = urlForInputFile
+        self.mineosPathname = config.mineosPathname
+        self.dry = config.dry
         self.info = info
 
-        self.job = None
+        self.jobChannel = stackless.channel()
         self.status = self.STATUS_NEW
         self.statusChanged = Event()
 
-        self.inputFileURLs = []
-        self.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
+    def go(self, gjm, fjm):
+        stackless.tasklet(self)(gjm, fjm)
 
-    def go(self, jm):
-        stackless.tasklet(self)(jm)
-
-    def __call__(self, jm):
+    def __call__(self, gjm, fjm):
         try:
             self.setStatus(self.STATUS_CONNECTING)
             
             # run
-            job = self.newJob()
-            self.job = job
-            jm.runJob(job)
+            mineos = self.newMineosJob()
+            specfem = self.newSpecfemJob()
+            fjm.runJob(mineos)
+            gjm.runJob(specfem)
 
-            # also reported by job; but we need it here to tickle job watcher
-            self.setStatus(self.STATUS_PREPARING)
-            
-            while job.isAlive():
-                job.statusChanged.wait()
+            # send jobs to jobSink()
+            self.jobChannel.send(mineos)
+            self.jobChannel.send(specfem)
+            self.jobChannel.send(None) # no more jobs
+
+            while specfem.isAlive():
+                specfem.statusChanged.wait()
             # while
 
-            if job.status == job.STATUS_FAILED:
+            if specfem.status == specfem.STATUS_FAILED:
                 self.setStatus(self.STATUS_ERROR)
                 raise RuntimeError("run failed")
 
-            self.job = None
+            # The Mineos job should have finished long ago.
+            while mineos.isAlive():
+                mineos.statusChanged.wait()
+            if mineos.status == mineos.STATUS_FAILED:
+                self.setStatus(self.STATUS_ERROR)
+                raise RuntimeError("Mineos run failed")
+
             self.setStatus(self.STATUS_DONE)
 
         except Exception, e:
@@ -361,7 +425,10 @@
     def isAlive(self):
         return not self.status in self.deadCodes
 
-    def newJob(self):
+    def newSpecfemJob(self):
+        dry = []
+        if self.dry:
+            dry = ["--scheduler.dry"]
         job = Job(
             "run",
             jobType = "single",
@@ -377,13 +444,26 @@
                          "--output-dir=.", # Globus scratch dir
                          "--job.stdout=stdout.txt",
                          "--job.stderr=stderr.txt",
-                         ],
+                         ] + dry,
             )
-        job.inputFileURLs = self.inputFileURLs
-        job.outputFiles = self.outputFiles
+        job.urlForInputFile = self.urlForInputFile
+        sim = self.simulation
+        job.inputFiles = [sim.parameters, sim.events, sim.stations]
+        job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
         return job
 
+    def newMineosJob(self):
+        job = Job(
+            "run",
+            executable = self.mineosPathname,
+            arguments = [],
+            )
+        job.urlForInputFile = self.urlForInputFile
+        job.inputFiles = ["mineos/event.txt", "mineos/site.txt", "mineos/sitechan.txt"]
+        job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
+        return job
 
+
 class Simulation(object):
     def __init__(self, id, nodes):
         self.id = id
@@ -391,7 +471,6 @@
         self.parameters = 'parameters.pml'
         self.events = 'events.txt'
         self.stations = 'stations.txt'
-        self.inputFiles = [self.parameters, self.events, self.stations]
 
 
 class PortalConnection(object):
@@ -404,10 +483,10 @@
         self.clock = clock
         self.info = info
 
-    def runSimulations(self, jm):
-        stackless.tasklet(self.runFactory)(jm)
+    def runSimulations(self, gjm, fjm, config):
+        stackless.tasklet(self.runFactory)(gjm, fjm, config)
 
-    def runFactory(self, jm):
+    def runFactory(self, gjm, fjm, config):
         import urllib2
         
         runs = {}
@@ -437,32 +516,39 @@
                     not runs.has_key(id)):
                     self.info.log("new run %d" % id)
                     simulation = Simulation(simId, nodes)
-                    newRun = Run(id, simulation, self.info)
-                    for inputFile in simulation.inputFiles:
-                        url = self.inputFileURL(simulation, inputFile)
-                        newRun.inputFileURLs.append((url, inputFile))
+
+                    def urlForInputFile(inputFile):
+                        # Map input filenames to URLs in the context
+                        # of this run.
+                        return self.inputFileURL(simulation, inputFile)
+                    
+                    newRun = Run(id, simulation, urlForInputFile, config, self.info)
+                    
                     self.watchRun(newRun)
                     runs[id] = newRun
-                    newRun.go(jm)
+                    newRun.go(gjm, fjm)
         
         return
 
     def watchRun(self, run):
         stackless.tasklet(self.runWatcher)(run)
+        stackless.tasklet(self.jobSink)(run)
         
     def runWatcher(self, run):
         url = self.portal.runStatusUrl % run.id
-        job = run.job
         while run.isAlive():
             run.statusChanged.wait()
             fields = {'status': run.status}
             self.postStatusChange(url, fields)
-            if run.job != job:
-                job = run.job
-                if job is not None:
-                    self.watchJob(job, run)
         return
 
+    def jobSink(self, run):
+        newJob = run.jobChannel.receive()
+        while newJob is not None:
+            self.watchJob(newJob, run)
+            newJob = run.jobChannel.receive()
+        return
+
     def watchJob(self, job, run):
         url = self.portal.jobCreateUrl
         fields = self.jobFields(job, run)
@@ -491,7 +577,7 @@
             # in case the daemon and the web server are run as
             # different users
             import stat
-            os.chmod(pathname, stat.S_IRGRP | stat.S_IROTH)
+            os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
             
             fields = {
                 'job': portalId,
@@ -614,25 +700,22 @@
     portal = pyre.facility("portal", factory=WebPortal)
     sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
 
-    downloadInputFiles  = pyre.bool("download-input-files", default=False)
-    downloadInputFiles.meta['tip'] = (
-"""Download input files from the portal and serve them from the GASS
-server.  This is required when debugging a portal which is running
-locally (at 'localhost').  The default is 'False' because Globus can
-stage-in input files directly from a publicly-accessible portal via
-HTTP.""")
-
     outputRootPathname = pyre.str("output-root-pathname")
     outputRootUrl = pyre.str("output-root-url")
 
+    mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
 
+    dry = pyre.bool("dry", default=False)
+
+
     def main(self, *args, **kwds):
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
         clock = Clock(self.sleepInterval / second)
-        jm = GlobusJobManager(clock, self.outputRootPathname, self._info, self.downloadInputFiles)
+        gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+        fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
         connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
-        connection.runSimulations(jm)
+        connection.runSimulations(gjm, fjm, self)
         try:
             stackless.run()
         except KeyboardInterrupt:

Added: cs/portal/trunk/mineos.py
===================================================================
--- cs/portal/trunk/mineos.py	2007-11-22 00:16:11 UTC (rev 8319)
+++ cs/portal/trunk/mineos.py	2007-11-22 03:08:26 UTC (rev 8320)
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+f = open("output_mineos.txt", "w")
+print >>f, "Hello from Mineos!"
+f.close()


Property changes on: cs/portal/trunk/mineos.py
___________________________________________________________________
Name: svn:executable
   + *



More information about the cig-commits mailing list