[cig-commits] r11790 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Wed Apr 9 13:32:11 PDT 2008


Author: leif
Date: 2008-04-09 13:32:11 -0700 (Wed, 09 Apr 2008)
New Revision: 11790

Modified:
   cs/portal/trunk/daemon.py
Log:
Added RemoteShellJobManager.


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2008-04-09 20:30:21 UTC (rev 11789)
+++ cs/portal/trunk/daemon.py	2008-04-09 20:32:11 UTC (rev 11790)
@@ -15,7 +15,6 @@
 
 TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
 TG_COMMUNITY = "/projects/tg"
-SPECFEM_3D_GLOBE = TG_COMMUNITY + "/CIG/sf3dgp"
 
 # I'm not sure what to do about this yet.
 TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
@@ -156,7 +155,7 @@
         try:
             self.downloadInputFilesForJob(job)
 
-            argv = [job.resSpec['executable']] + job.resSpec['arguments']
+            argv = self.argvForJob(job)
             command = ' '.join(argv)
             self.info.log("spawning: %s" % command)
             savedWd = os.getcwd()
@@ -170,9 +169,11 @@
             while wpid == 0:
                 for t in xrange(ticks):
                     self.clock.tick.wait()
-                ticks *= 2
+                ticks = max(ticks * 2, 10)
                 wpid, status = os.waitpid(pid, os.WNOHANG)
 
+            self.uploadOutputFilesForJob(job)
+            
             if (os.WIFSIGNALED(status)):
                 statusStr = "signal %d" % os.WTERMSIG(status)
                 job.setStatus(job.STATUS_FAILED)
@@ -196,15 +197,75 @@
         return
 
 
+    def argvForJob(self, job):
+        return [job.resSpec['executable']] + job.resSpec['arguments']
 
+
+    def uploadOutputFilesForJob(self, job):
+        return
+
+
+class RemoteShellJobManager(ForkJobManager):
+
+    def __init__(self, clock, root, info, config):
+        ForkJobManager.__init__(self, clock, root, info)
+        self.rsh = config.remoteShellCommand.split()
+        self.rdown = config.remoteDownloadCommand
+        self.rup = config.remoteUploadCommand
+        self.remoteOutputRoot = config.remoteOutputRoot
+
+
+    def downloadInputFilesForJob(self, job):
+        # First, download to the local host.
+        ForkJobManager.downloadInputFilesForJob(self, job)
+
+        # Create a remote directory for this run.
+        remoteDirectory = self.remoteDirectoryForJob(job)
+        argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
+        self.spawn(*argv)
+        
+        # Now copy to the remote host.
+        for inputFile in job.inputFiles:
+            filename = inputFile.split('/')[-1] # strips 'mineos/'
+            pathname = os.path.join(job.directory, filename)
+            remotePathname = os.path.join(remoteDirectory, filename)
+            argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
+            self.spawn(*argv)
+            
+        return
+
+
+    def uploadOutputFilesForJob(self, job):
+        remoteDirectory = self.remoteDirectoryForJob(job)
+        for filename in job.outputFiles:
+            pathname = os.path.join(job.directory, filename)
+            remotePathname = os.path.join(remoteDirectory, filename)
+            argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
+            try:
+                self.spawn(*argv)
+            except Exception:
+                pass
+        return
+
+
+    def argvForJob(self, job):
+        remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
+                         ' '.join(ForkJobManager.argvForJob(self, job)))
+        return self.rsh + [remoteCommand]
+
+
+    def remoteDirectoryForJob(self, job):
+        return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
+
+
 class GlobusJobManager(JobManager):
 
     def jobRunner(self, job):
 
-        self.downloadInputFilesForJob(job)
-        gassServer = GassServer.startUp(job.directory, self.info)
+        try:
+            self.downloadInputFilesForJob(job)
+            gassServer = GassServer.startUp(job.directory, self.info)
         
-        try:
             resSpec = self.resSpec(job, gassServer)
             id = self.globusrun(resSpec)
 
@@ -345,7 +406,8 @@
     statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
     deadCodes = [STATUS_DONE, STATUS_FAILED]
 
-    def __init__(self, task, **resSpec):
+    def __init__(self, run, task, **resSpec):
+        self.run = run
         self.task = task
         self.resSpec = resSpec
         self.status = self.STATUS_NEW
@@ -384,6 +446,7 @@
         self.id = id
         self.simulation = simulation
         self.urlForInputFile = urlForInputFile
+        self.specfemPathname = config.specfemPathname
         self.mineosPathname = config.mineosPathname
         self.dry = config.dry
         self.info = info
@@ -441,26 +504,26 @@
         if self.dry:
             dry = ["--scheduler.dry"]
         job = Job(
+            self,
             "run",
             jobType = "single",
             count = 1,
-            executable = SPECFEM_3D_GLOBE + "/xspecfem3D",
-            arguments = [self.simulation.parameters,
-                         '--solver.cmt-solution=' + self.simulation.events,
-                         '--solver.stations=' + self.simulation.stations,
+            executable = self.specfemPathname,
+            arguments = ['--par-file=' + self.simulation.parameters,
+                         '--cmt-solution=' + self.simulation.event,
+                         '--stations=' + self.simulation.stations,
                          "--scheduler.wait=True",
                          "--job.name=run%05d" % self.id,
-                         "--portal-run-id=%d" % self.id,
+                         #"--portal-run-id=%d" % self.id,
                          #"--job.walltime=2*hour",
-                         "--output-dir=.", # Globus scratch dir
                          "--job.stdout=stdout.txt",
                          "--job.stderr=stderr.txt",
                          ] + dry,
             )
         job.urlForInputFile = self.urlForInputFile
         sim = self.simulation
-        job.inputFiles = [sim.parameters, sim.events, sim.stations]
-        job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
+        job.inputFiles = [sim.parameters, sim.event, sim.stations]
+        job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
         return job
 
 
@@ -468,6 +531,7 @@
 
     def newJob(self):
         job = Job(
+            self,
             "run",
             executable = self.mineosPathname,
             arguments = ["parameters.pml"],
@@ -482,7 +546,7 @@
     def __init__(self, id):
         self.id = id
         self.parameters = 'par_file.txt'
-        self.events = 'event.txt'
+        self.event = 'event.txt'
         self.stations = 'stations.txt'
 
 
@@ -722,8 +786,15 @@
     outputRootPathname = pyre.str("output-root-pathname")
     outputRootUrl = pyre.str("output-root-url")
 
+    specfemPathname = pyre.str("specfem-pathname", default="pyspecfem3D")
     mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
 
+    remote = pyre.bool("remote", default=False)
+    remoteShellCommand = pyre.str("remote-shell-command")
+    remoteDownloadCommand = pyre.str("remote-download-command")
+    remoteUploadCommand = pyre.str("remote-upload-command")
+    remoteOutputRoot = pyre.str("remote-output-root")
+
     dry = pyre.bool("dry", default=False)
 
 
@@ -731,7 +802,10 @@
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
         clock = Clock(self.sleepInterval / second)
-        gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+        if self.remote:
+            gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
+        else:
+            gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
         fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
         connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
         connection.runSimulations(gjm, fjm, self)



More information about the cig-commits mailing list