[cig-commits] r7853 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Mon Aug 20 19:57:20 PDT 2007


Author: leif
Date: 2007-08-20 19:57:20 -0700 (Mon, 20 Aug 2007)
New Revision: 7853

Modified:
   cs/portal/trunk/daemon.py
Log:
Grab 'nodes' (# processors) from portal.  Added the ability to
stage-in files via the daemon by way of the GASS server (useful for
debugging a portal running locally at 'localhost').


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-08-21 02:09:03 UTC (rev 7852)
+++ cs/portal/trunk/daemon.py	2007-08-21 02:57:20 UTC (rev 7853)
@@ -14,6 +14,11 @@
         self.value = value
 
 
+# I'm not sure what to about this yet.
+TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "/work/teragrid/tg456271") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg456271/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))"""
+
+
+
 class GassServer(object):
 
     # @classmethod
@@ -38,9 +43,10 @@
 
 class GlobusJobManager(object):
 
-    def __init__(self, clock, info):
+    def __init__(self, clock, info, downloadInputFiles=False):
         self.clock = clock
         self.info = info
+        self.downloadInputFiles = downloadInputFiles
 
     
     def runJob(self, job):
@@ -60,7 +66,9 @@
                 self.clock.tick.wait()
                 status = self.getJobStatus(id)
                 if status != oldStatus:
-                    job.setStatus(status)
+                    # Filter (e.g.) "UNKNOWN JOB STATE 64".
+                    if not status.startswith("UNKNOWN "):
+                        job.setStatus(status)
                     oldStatus = status
         
         except Exception, e:
@@ -144,6 +152,7 @@
             attribute, valueSequence = relation
             valueSequence = self.rslValueSequenceToString(valueSequence)
             print >>stream, '(', attribute, '=', valueSequence, ')'
+        print >>stream, TACC_ENVIRONMENT
         return
 
 
@@ -175,6 +184,9 @@
         
         file_stage_in = []
         for url, inputFile in job.inputFileURLs:
+            if self.downloadInputFiles:
+                self.download(url, inputFile)
+                url = gassServer.url + "/./" + inputFile
             file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
         resSpec["file_stage_in"] = file_stage_in
         
@@ -191,6 +203,18 @@
         return resSpec
 
 
+    def download(self, url, pathname):
+        import shutil
+        import urllib2
+        self.info.log("downloading %s" % url)
+        infile = urllib2.urlopen(url)
+        outfile = open(pathname, 'wb')
+        shutil.copyfileobj(infile,  outfile)
+        outfile.close()
+        infile.close()
+        return
+
+
 class Event(object):
     
     def __init__(self):
@@ -253,8 +277,9 @@
     STATUS_ERROR      = "SimStatusError"
     deadCodes = [STATUS_DONE, STATUS_ERROR]
 
-    def __init__(self, id, info):
+    def __init__(self, id, nodes, info):
         self.id = id
+        self.nodes = nodes
         self.info = info
         self.status = None
         self.statusChanged = Event()
@@ -324,7 +349,7 @@
         job = Job(
             jobType = "single",
             count = 1,
-            executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
+            executable = "/work/teragrid/tg456271/sf3dgp/xspecfem3D",
             arguments = [self.parameters, "--scheduler.dry", "--output-dir=/work/teragrid/tg456271"],
             stdout = "build-stdout",
             stderr = "build-stderr",
@@ -335,16 +360,16 @@
     def newRunJob(self):
         from os.path import join
         
-        sf = "/work/teragrid/tg456271/SPECFEM3D_GLOBE"
+        sf = "/work/teragrid/tg456271/sf3dgp"
         pythonPath = ':'.join([
             sf + "/merlin-1.3.egg",
             sf,
             sf + "/python/pythia-0.8.1.4-py2.4.egg",
             sf + "/python/Cheetah-2.0rc8-py2.4-linux-x86_64.egg",
             ])
-        nodes = 24
 
         simDir = "." # ???
+        simDir = "/work/teragrid/tg456271/portal/%d" % self.id
         
         pyreArgs = [
             self.parameters,
@@ -355,7 +380,7 @@
         
         job = Job(
             jobType = "mpi",
-            count = nodes,
+            count = self.nodes,
             max_time = 60,
             queue = "normal",
             executable = "/work/teragrid/tg456271/mpipyspecfem3D",
@@ -364,8 +389,8 @@
                          "Specfem3DGlobe==4.0",
                          "mpi:mpistart",
                          "Specfem3DGlobe.Specfem:Specfem",
-                         "--nodes=%d" % nodes,
-                         "--macros.nodes=%d" % nodes,
+                         "--nodes=%d" % self.nodes,
+                         "--macros.nodes=%d" % self.nodes,
                          "--macros.job.name=mysim",
                          "--macros.job.id=123456",
                          ] + pyreArgs,
@@ -410,10 +435,11 @@
             for sim in simulationList:
                 status = sim['status']
                 id = int(sim['id'])
+                nodes = int(sim['nodes'])
                 if (status == Simulation.STATUS_NEW and
                     not simulations.has_key(id)):
                     self.info.log("new simulation %d" % id)
-                    newSimulation = Simulation(id, self.info)
+                    newSimulation = Simulation(id, nodes, self.info)
                     for inputFile in newSimulation.inputFiles:
                         url = self.inputFileURL(newSimulation, inputFile)
                         newSimulation.inputFileURLs.append((url, inputFile))
@@ -527,11 +553,20 @@
     portal = pyre.facility("portal", factory=WebPortal)
     sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
 
+    downloadInputFiles  = pyre.bool("download-input-files", default=False)
+    downloadInputFiles.meta['tip'] = (
+"""Download input files from the portal and serve them from the GASS
+server.  This is required when debugging a portal which is running
+locally (at 'localhost').  The default is 'False' because Globus can
+stage-in input files directly from a publicly-accessible portal via
+HTTP.""")
+
+
     def main(self, *args, **kwds):
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
         clock = Clock(self.sleepInterval / second)
-        jm = GlobusJobManager(clock, self._info)
+        jm = GlobusJobManager(clock, self._info, self.downloadInputFiles)
         connection = PortalConnection(self.portal, clock, self._info)
         connection.runSimulations(jm)
         try:



More information about the cig-commits mailing list