[cig-commits] r14243 - cs/portal/trunk/northridge/backend

leif at geodynamics.org leif at geodynamics.org
Fri Mar 6 13:12:23 PST 2009


Author: leif
Date: 2009-03-06 13:12:23 -0800 (Fri, 06 Mar 2009)
New Revision: 14243

Modified:
   cs/portal/trunk/northridge/backend/daemon.py
   cs/portal/trunk/northridge/backend/mineos.py
Log:
Split Mineos runs into two Jobs (class SeismoWebPortal.models.Job).
This reflects what is actually happening under the hood in the new
parallel runs.  A 1-4 processor LSF job is run for minos_bran/eigcon;
then, a second N=100 processor job is scheduled for green/syndat.
Independent Jobs in the database will allow each stage to have its own
progress bar.


Modified: cs/portal/trunk/northridge/backend/daemon.py
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py	2009-03-06 20:41:35 UTC (rev 14242)
+++ cs/portal/trunk/northridge/backend/daemon.py	2009-03-06 21:12:23 UTC (rev 14243)
@@ -273,7 +273,7 @@
 
         # Periodically connect to the remote host and check to see
         # whether the job has finished.
-        argv = self.argvForJob(job, extraArgs=["--context=monitor"])
+        argv = self.argvForJob(job, extraArgs=job.monitorArgs)
         ticks = 60 # 10 minutes
         running = os.spawnvp(os.P_WAIT, argv[0], argv)
         while running:
@@ -444,6 +444,7 @@
         self.outputFiles = []
         self.subdir = None
         self.directory = None
+        self.monitorArgs = None
 
     def setStatus(self, status):
         if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
@@ -488,27 +489,27 @@
     def __call__(self):
         try:
             self.setStatus(self.STATUS_CONNECTING)
-            
-            # run
-            job = self.newJob()
-            self.jm.runJob(job)
 
-            # send jobs to jobSink()
-            self.jobChannel.send(job)
-            self.jobChannel.send(None) # no more jobs
+            for job in self.iterJobs():
+                # run
+                self.jm.runJob(job)
 
-            self.setStatus(self.STATUS_PREPARING)
+                # send jobs to jobSink()
+                self.jobChannel.send(job)
+                self.jobChannel.send(None) # no more jobs
 
-            while job.isAlive():
-                job.statusChanged.wait()
-            # while
+                self.setStatus(self.STATUS_PREPARING)
 
-            if job.status == job.STATUS_FAILED:
-                self.setStatus(self.STATUS_ERROR)
-                raise RuntimeError("run failed")
+                while job.isAlive():
+                    job.statusChanged.wait()
+                # while
 
+                if job.status == job.STATUS_FAILED:
+                    self.setStatus(self.STATUS_ERROR)
+                    raise RuntimeError("run failed")
+
             self.setStatus(self.STATUS_DONE)
-
+            
         except Exception, e:
             self.info.log("error: %s: %s" % (e.__class__.__name__, e))
             self.setStatus(self.STATUS_ERROR)
@@ -525,7 +526,7 @@
 
 class SpecfemRun(Run):
 
-    def newJob(self):
+    def iterJobs(self):
         dry = []
         if self.dry:
             dry = ["--scheduler.dry"]
@@ -554,24 +555,39 @@
         job.urlForInputFile = self.urlForInputFile
         job.inputFiles = [parameters, event, stations, model]
         job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
-        return job
+        job.monitorArgs = ["--context=monitor"]
+        yield job
+        return
 
 
 class MineosRun(Run):
 
-    def newJob(self):
+    def iterJobs(self):
         job = Job(
             self,
-            "run",
+            "eigen",
             executable = self.mineosPathname,
-            arguments = ["parameters.pml"],
+            arguments = ["parameters.pml", "--context=loginEigen"],
             )
         job.urlForInputFile = self.urlForInputFile
         job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
+        job.monitorArgs = ["--context=monitorEigen"]
+        yield job
+        
+        job = Job(
+            self,
+            "seismograms",
+            executable = self.mineosPathname,
+            arguments = ["parameters.pml", "--context=loginSeismograms"],
+            )
+        job.urlForInputFile = self.urlForInputFile
         job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
-        return job
+        job.monitorArgs = ["--context=monitorSeismograms"]
+        yield job
 
+        return
 
+
 class PortalConnection(object):
 
     MULTIPART_BOUNDARY = '----------eArThQuAkE$'

Modified: cs/portal/trunk/northridge/backend/mineos.py
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py	2009-03-06 20:41:35 UTC (rev 14242)
+++ cs/portal/trunk/northridge/backend/mineos.py	2009-03-06 21:12:23 UTC (rev 14243)
@@ -17,11 +17,12 @@
 prefix = dirname(dirname(__file__))
 
 
-mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
 
 
 archiveDirName = "mineos"
 archive = archiveDirName + ".tar.gz"
+eigenDone = "eigenDone"
 
 
 class Mode(object):
@@ -268,16 +269,16 @@
     launchCommand  = pyre.str("launch-command")
 
     context        = pyre.str("context", default="login", validator=pyre.choice(["login",
-                                                                                 "launchEigen", "computeEigen",
-                                                                                 "launchSeismograms", "computeSeismograms",
-                                                                                 "monitor"]))
+                                                                                 "loginEigen", "launchEigen", "computeEigen",
+                                                                                 "loginSeismograms", "launchSeismograms", "computeSeismograms",
+                                                                                 "monitorEigen", "monitorSeismograms"]))
     progressUrl    = pyre.str("progress-url")
 
 
     #------------------------------------------------------------------------
     # running
 
-    def runMineosPrograms(self, *args, **kwds):
+    def loginEigen(self, *args, **kwds):
 
         enabledModes = self.enabledModes()
         self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
@@ -302,7 +303,22 @@
         job.nodes = len(modes)
         job.arguments.extend(["--context=launchEigen", "--nodes=%d" % job.nodes])
         self.scheduleJob(job)
+
+        # The existence this file signals that we are done.  See
+        # onMonitor().
+        self.spawn("touch", eigenDone)
+
+        return
+
+
+    def loginSeismograms(self, *args, **kwds):
         
+        enabledModes = self.enabledModes()
+        self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
+        if not enabledModes:
+            self._info.log("nothing to do")
+            return
+        
         jobDir = os.getcwd()
         os.chdir(archiveDirName)
         
@@ -487,13 +503,13 @@
     def main(self, *args, **kwds):
         self._info.activate()
         context = self.context
-        if context == "login":
+        if context.startswith("login"):
             self.onLoginNode(*args, **kwds)
         elif context.startswith("launch"):
             self.onLauncherNode(*args, **kwds)
         elif context.startswith("compute"):
             self.onCompute(*args, **kwds)
-        elif context == "monitor":
+        elif context.startswith("monitor"):
             self.onMonitor(*args, **kwds)
         return
 
@@ -509,12 +525,21 @@
             if pid:
                 self.monitorProgress(pid)
             else:
-                self.runMineosPrograms(*args, **kwds)
+                context = self.context
+                if context == "login":
+                    self.loginEigen(*args, **kwds)
+                    self.loginSeismograms(*args, **kwds)
+                elif context == "loginEigen":
+                    self.loginEigen(*args, **kwds)
+                elif context == "loginSeismograms":
+                    self.loginSeismograms(*args, **kwds)
         return
 
 
     def onMonitor(self, *args, **kwds):
-        if exists(archive):
+        filename = {"monitorEigen": eigenDone,
+                    "monitorSeismograms": archive}[self.context]
+        if exists(filename):
             sys.exit(0)
         sys.exit(1)
 



More information about the CIG-COMMITS mailing list