[cig-commits] r7777 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Fri Aug 3 20:01:40 PDT 2007


Author: leif
Date: 2007-08-03 20:01:40 -0700 (Fri, 03 Aug 2007)
New Revision: 7777

Removed:
   cs/portal/trunk/sleep.rsl
Modified:
   cs/portal/trunk/daemon.py
Log:
Start GASS server and stage-out output files.


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-08-03 20:47:03 UTC (rev 7776)
+++ cs/portal/trunk/daemon.py	2007-08-04 03:01:40 UTC (rev 7777)
@@ -14,6 +14,28 @@
         self.value = value
 
 
+class GassServer(object):
+
+    # @classmethod
+    def startUp(cls):
+        argv = ["globus-gass-server", "-r", "-w", "-c"]
+        child = Popen4(argv)
+        child.tochild.close()
+        url = child.fromchild.readline().strip()
+        return GassServer(child, url)
+    startUp = classmethod(startUp)
+
+    def __init__(self, child, url):
+        self.child = child
+        self.url = url
+
+    def shutDown(self):
+        argv = ["globus-gass-server-shutdown", self.url]
+        os.spawnvp(os.P_NOWAIT, argv[0], argv)
+        status = self.child.wait()
+        return
+
+
 class GlobusJobManager(object):
 
     def __init__(self, clock, info):
@@ -26,19 +48,28 @@
 
     
     def jobRunner(self, job):
+        
+        gassServer = GassServer.startUp()
+        
         try:
-            resSpec = self.resSpec(job)
+            resSpec = self.resSpec(job, gassServer)
             id = self.globusrun(resSpec)
+            
             oldStatus = job.status
             while job.isAlive():
                 self.clock.tick.wait()
-                status = self.globus_job_status(id)
+                status = self.getJobStatus(id)
                 if status != oldStatus:
                     job.setStatus(status)
                     oldStatus = status
+        
         except Exception, e:
             self.info.log("error: %s: %s" % (e.__class__.__name__, e))
             job.setStatus(job.STATUS_FAILED)
+
+        finally:
+            gassServer.shutDown()
+        
         return
 
 
@@ -130,21 +161,33 @@
         return ' '.join(s)
 
 
-    def globus_job_status(self, id):
+    def getJobStatus(self, id):
         output = self.ospawn("globus-job-status", id)
         status = output[0].strip()
         return status
 
 
-    def resSpec(self, job):
+    def resSpec(self, job, gassServer):
         resSpec = {}
         resSpec.update(job.resSpec)
         resSpec["scratch_dir"] = "/work/teragrid/tg456271"
         resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+        
         file_stage_in = []
         for url, inputFile in job.inputFileURLs:
             file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
         resSpec["file_stage_in"] = file_stage_in
+        
+        file_stage_out = []
+        for outputFile in job.outputFiles:
+            url = gassServer.url + "/./" + outputFile
+            file_stage_out.append([outputFile, url])
+        if file_stage_out:
+            resSpec["file_stage_out"] = file_stage_out
+        
+        resSpec["stdout"] = gassServer.url + "/./" + resSpec["stdout"]
+        resSpec["stderr"] = gassServer.url + "/./" + resSpec["stderr"]
+        
         return resSpec
 
 
@@ -187,9 +230,10 @@
         self.status = None
         self.statusChanged = Event()
         self.inputFileURLs = []
+        self.outputFiles = []
 
     def setStatus(self, status):
-        assert status in self.statusCodes
+        assert status in self.statusCodes, "unknown status: %s" % status
         self.status = status
         self.statusChanged.signal()
 
@@ -220,6 +264,7 @@
         self.stations = 'stations.txt'
         self.inputFiles = [self.parameters, self.events, self.stations]
         self.inputFileURLs = []
+        self.outputFiles = ["seismograms.tar.gz"]
 
     def run(self, jm):
         stackless.tasklet(self)(jm)
@@ -280,9 +325,9 @@
             jobType = "single",
             count = 1,
             executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
-            arguments = ["mysim.cfg", "--scheduler.dry"],
-            stdout = "stdout.mysim.test",
-            stderr = "stderr.mysim.test",
+            arguments = [self.parameters, "--scheduler.dry", "--output-dir=/work/teragrid/tg456271"],
+            stdout = "build-stdout",
+            stderr = "build-stderr",
             )
         job.inputFileURLs = self.inputFileURLs
         return job
@@ -299,14 +344,13 @@
             ])
         nodes = 24
 
-        simDir = "/work/teragrid/tg456271/simulations/mysim"
+        simDir = "." # ???
         
         pyreArgs = [
             self.parameters,
             '--output-dir=' + simDir,
             '--solver.cmt-solution=' + self.events,
             '--solver.stations=' + self.stations,
-            '--solver.seismogram-archive=' + join(simDir, 'output.tar.gz'),
             ]
         
         job = Job(
@@ -314,23 +358,23 @@
             count = nodes,
             max_time = 60,
             queue = "normal",
-            executable = "/work/teragrid/tg456271/simulations/mysim/mpipyspecfem3D",
+            executable = "/work/teragrid/tg456271/mpipyspecfem3D",
             arguments = ["--pyre-start",
                          pythonPath,
                          "Specfem3DGlobe==4.0",
                          "mpi:mpistart",
                          "Specfem3DGlobe.Specfem:Specfem",
-                         "mysim.cfg",
                          "--nodes=%d" % nodes,
                          "--macros.nodes=%d" % nodes,
                          "--macros.job.name=mysim",
                          "--macros.job.id=123456",
                          ] + pyreArgs,
-            stdout = "stdout.mysim.run",
-            stderr = "stderr.mysim.run",
+            stdout = "run-stdout",
+            stderr = "run-stderr",
             )
 
         job.inputFileURLs = self.inputFileURLs
+        job.outputFiles = self.outputFiles
 
         return job
 

Deleted: cs/portal/trunk/sleep.rsl
===================================================================
--- cs/portal/trunk/sleep.rsl	2007-08-03 20:47:03 UTC (rev 7776)
+++ cs/portal/trunk/sleep.rsl	2007-08-04 03:01:40 UTC (rev 7777)
@@ -1,6 +0,0 @@
-&(jobType="single")
-(directory="/home/teragrid/tg456271/sci-gateway")
-(executable=/bin/sleep)
-(arguments="10")
-(stdout="sleep-stdout")
-(stderr="sleep-stderr")



More information about the cig-commits mailing list