[cig-commits] r7770 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Wed Aug 1 18:10:21 PDT 2007


Author: leif
Date: 2007-08-01 18:10:21 -0700 (Wed, 01 Aug 2007)
New Revision: 7770

Modified:
   cs/portal/trunk/daemon.py
Log:
Stage-in input files.


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-08-01 21:20:44 UTC (rev 7769)
+++ cs/portal/trunk/daemon.py	2007-08-02 01:10:21 UTC (rev 7770)
@@ -8,6 +8,12 @@
 from pyre.units.time import second
 
 
+# NYI: RSL AST
+class RSLUnquoted(object):
+    def __init__(self, value):
+        self.value = value
+
+
 class GlobusJobManager(object):
 
     def __init__(self, clock, info):
@@ -21,8 +27,8 @@
     
     def jobRunner(self, job):
         try:
-            # (file_stage_in = ($(GLOBUSRUN_GASS_URL)/./tomato $(SCRATCH_DIRECTORY)/input))
-            id = self.globusrun(job.resSpec)
+            resSpec = self.resSpec(job)
+            id = self.globusrun(resSpec)
             oldStatus = job.status
             while job.isAlive():
                 self.clock.tick.wait()
@@ -32,7 +38,7 @@
                     oldStatus = status
         except Exception, e:
             self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_ERROR)
+            job.setStatus(job.STATUS_FAILED)
         return
 
 
@@ -88,7 +94,7 @@
             if resSpec['jobType'] == "mpi":
                 resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
             else:
-                resourceManager = "tg-login.tacc.teragrid.org"
+                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
             output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
             id = None
             for line in output:
@@ -105,19 +111,43 @@
         print >>stream, '&'
         for relation in resSpec.iteritems():
             attribute, valueSequence = relation
-            if not isinstance(valueSequence, (tuple, list)):
-                valueSequence = [valueSequence]
-            valueSequence = ['"%s"' % value for value in valueSequence]
-            print >>stream, '(', attribute, '=', ' '.join(valueSequence), ')'
+            valueSequence = self.rslValueSequenceToString(valueSequence)
+            print >>stream, '(', attribute, '=', valueSequence, ')'
         return
 
 
+    def rslValueSequenceToString(self, valueSequence):
+        if not isinstance(valueSequence, (tuple, list)):
+            valueSequence = [valueSequence]
+        s = []
+        for value in valueSequence:
+            if isinstance(value, RSLUnquoted):
+                s.append(value.value)
+            elif isinstance(value, (tuple, list)):
+                s.append('(' + self.rslValueSequenceToString(value) + ')')
+            else:
+                s.append('"%s"' % value)
+        return ' '.join(s)
+
+
     def globus_job_status(self, id):
         output = self.ospawn("globus-job-status", id)
         status = output[0].strip()
         return status
 
 
+    def resSpec(self, job):
+        resSpec = {}
+        resSpec.update(job.resSpec)
+        resSpec["scratch_dir"] = "/work/teragrid/tg456271"
+        resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+        file_stage_in = []
+        for url, inputFile in job.inputFileURLs:
+            file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
+        resSpec["file_stage_in"] = file_stage_in
+        return resSpec
+
+
 class Event(object):
     
     def __init__(self):
@@ -156,7 +186,7 @@
         self.resSpec = resSpec
         self.status = None
         self.statusChanged = Event()
-        self.inputFiles = []
+        self.inputFileURLs = []
 
     def setStatus(self, status):
         assert status in self.statusCodes
@@ -166,10 +196,7 @@
     def isAlive(self):
         return not self.status in self.deadCodes
 
-    def stageIn(self, filename):
-        self.inputFiles.append(filename)
 
-
 class Simulation(object):
     
     # status codes
@@ -188,6 +215,12 @@
         self.status = None
         self.statusChanged = Event()
 
+        self.parameters = 'parameters.pml'
+        self.events = 'events.txt'
+        self.stations = 'stations.txt'
+        self.inputFiles = [self.parameters, self.events, self.stations]
+        self.inputFileURLs = []
+
     def run(self, jm):
         stackless.tasklet(self)(jm)
 
@@ -243,15 +276,16 @@
         return not self.status in self.deadCodes
 
     def newBuildJob(self):
-        return Job(
+        job = Job(
             jobType = "single",
             count = 1,
-            directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
             executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
             arguments = ["mysim.cfg", "--scheduler.dry"],
             stdout = "stdout.mysim.test",
             stderr = "stderr.mysim.test",
             )
+        job.inputFileURLs = self.inputFileURLs
+        return job
 
     def newRunJob(self):
         from os.path import join
@@ -265,18 +299,13 @@
             ])
         nodes = 24
 
-        parameters = 'parameters.pml'
-        events = 'events.txt'
-        stations = 'stations.txt'
-        inputFiles = [parameters, events, stations]
-
         simDir = "/work/teragrid/tg456271/simulations/mysim"
         
         pyreArgs = [
-            parameters,
+            self.parameters,
             '--output-dir=' + simDir,
-            '--solver.cmt-solution=' + events,
-            '--solver.stations=' + stations,
+            '--solver.cmt-solution=' + self.events,
+            '--solver.stations=' + self.stations,
             '--solver.seismogram-archive=' + join(simDir, 'output.tar.gz'),
             ]
         
@@ -285,7 +314,6 @@
             count = nodes,
             max_time = 60,
             queue = "normal",
-            directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
             executable = "/work/teragrid/tg456271/simulations/mysim/mpipyspecfem3D",
             arguments = ["--pyre-start",
                          pythonPath,
@@ -302,8 +330,7 @@
             stderr = "stderr.mysim.run",
             )
 
-        for inputFile in inputFiles:
-            job.stageIn(inputFile)
+        job.inputFileURLs = self.inputFileURLs
 
         return job
 
@@ -343,6 +370,9 @@
                     not simulations.has_key(id)):
                     self.info.log("new simulation %d" % id)
                     newSimulation = Simulation(id, self.info)
+                    for inputFile in newSimulation.inputFiles:
+                        url = self.inputFileURL(newSimulation, inputFile)
+                        newSimulation.inputFileURLs.append((url, inputFile))
                     self.watchSimulation(newSimulation)
                     simulations[id] = newSimulation
                     newSimulation.run(jm)



More information about the cig-commits mailing list