[cig-commits] r11790 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Wed Apr 9 13:32:11 PDT 2008
Author: leif
Date: 2008-04-09 13:32:11 -0700 (Wed, 09 Apr 2008)
New Revision: 11790
Modified:
cs/portal/trunk/daemon.py
Log:
Added RemoteShellJobManager.
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2008-04-09 20:30:21 UTC (rev 11789)
+++ cs/portal/trunk/daemon.py 2008-04-09 20:32:11 UTC (rev 11790)
@@ -15,7 +15,6 @@
TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
TG_COMMUNITY = "/projects/tg"
-SPECFEM_3D_GLOBE = TG_COMMUNITY + "/CIG/sf3dgp"
# I'm not sure what to do about this yet.
TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
@@ -156,7 +155,7 @@
try:
self.downloadInputFilesForJob(job)
- argv = [job.resSpec['executable']] + job.resSpec['arguments']
+ argv = self.argvForJob(job)
command = ' '.join(argv)
self.info.log("spawning: %s" % command)
savedWd = os.getcwd()
@@ -170,9 +169,11 @@
while wpid == 0:
for t in xrange(ticks):
self.clock.tick.wait()
- ticks *= 2
+ ticks = max(ticks * 2, 10)
wpid, status = os.waitpid(pid, os.WNOHANG)
+ self.uploadOutputFilesForJob(job)
+
if (os.WIFSIGNALED(status)):
statusStr = "signal %d" % os.WTERMSIG(status)
job.setStatus(job.STATUS_FAILED)
@@ -196,15 +197,75 @@
return
+ def argvForJob(self, job):
+ return [job.resSpec['executable']] + job.resSpec['arguments']
+
+ def uploadOutputFilesForJob(self, job):
+ return
+
+
+class RemoteShellJobManager(ForkJobManager):
+
+ def __init__(self, clock, root, info, config):
+ ForkJobManager.__init__(self, clock, root, info)
+ self.rsh = config.remoteShellCommand.split()
+ self.rdown = config.remoteDownloadCommand
+ self.rup = config.remoteUploadCommand
+ self.remoteOutputRoot = config.remoteOutputRoot
+
+
+ def downloadInputFilesForJob(self, job):
+ # First, download to the local host.
+ ForkJobManager.downloadInputFilesForJob(self, job)
+
+ # Create a remote directory for this run.
+ remoteDirectory = self.remoteDirectoryForJob(job)
+ argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
+ self.spawn(*argv)
+
+ # Now copy to the remote host.
+ for inputFile in job.inputFiles:
+ filename = inputFile.split('/')[-1] # strips 'mineos/'
+ pathname = os.path.join(job.directory, filename)
+ remotePathname = os.path.join(remoteDirectory, filename)
+ argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
+ self.spawn(*argv)
+
+ return
+
+
+ def uploadOutputFilesForJob(self, job):
+ remoteDirectory = self.remoteDirectoryForJob(job)
+ for filename in job.outputFiles:
+ pathname = os.path.join(job.directory, filename)
+ remotePathname = os.path.join(remoteDirectory, filename)
+ argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
+ try:
+ self.spawn(*argv)
+ except Exception:
+ pass
+ return
+
+
+ def argvForJob(self, job):
+ remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
+ ' '.join(ForkJobManager.argvForJob(self, job)))
+ return self.rsh + [remoteCommand]
+
+
+ def remoteDirectoryForJob(self, job):
+ return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
+
+
class GlobusJobManager(JobManager):
def jobRunner(self, job):
- self.downloadInputFilesForJob(job)
- gassServer = GassServer.startUp(job.directory, self.info)
+ try:
+ self.downloadInputFilesForJob(job)
+ gassServer = GassServer.startUp(job.directory, self.info)
- try:
resSpec = self.resSpec(job, gassServer)
id = self.globusrun(resSpec)
@@ -345,7 +406,8 @@
statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
deadCodes = [STATUS_DONE, STATUS_FAILED]
- def __init__(self, task, **resSpec):
+ def __init__(self, run, task, **resSpec):
+ self.run = run
self.task = task
self.resSpec = resSpec
self.status = self.STATUS_NEW
@@ -384,6 +446,7 @@
self.id = id
self.simulation = simulation
self.urlForInputFile = urlForInputFile
+ self.specfemPathname = config.specfemPathname
self.mineosPathname = config.mineosPathname
self.dry = config.dry
self.info = info
@@ -441,26 +504,26 @@
if self.dry:
dry = ["--scheduler.dry"]
job = Job(
+ self,
"run",
jobType = "single",
count = 1,
- executable = SPECFEM_3D_GLOBE + "/xspecfem3D",
- arguments = [self.simulation.parameters,
- '--solver.cmt-solution=' + self.simulation.events,
- '--solver.stations=' + self.simulation.stations,
+ executable = self.specfemPathname,
+ arguments = ['--par-file=' + self.simulation.parameters,
+ '--cmt-solution=' + self.simulation.event,
+ '--stations=' + self.simulation.stations,
"--scheduler.wait=True",
"--job.name=run%05d" % self.id,
- "--portal-run-id=%d" % self.id,
+ #"--portal-run-id=%d" % self.id,
#"--job.walltime=2*hour",
- "--output-dir=.", # Globus scratch dir
"--job.stdout=stdout.txt",
"--job.stderr=stderr.txt",
] + dry,
)
job.urlForInputFile = self.urlForInputFile
sim = self.simulation
- job.inputFiles = [sim.parameters, sim.events, sim.stations]
- job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt"]
+ job.inputFiles = [sim.parameters, sim.event, sim.stations]
+ job.outputFiles = ["seismograms.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
return job
@@ -468,6 +531,7 @@
def newJob(self):
job = Job(
+ self,
"run",
executable = self.mineosPathname,
arguments = ["parameters.pml"],
@@ -482,7 +546,7 @@
def __init__(self, id):
self.id = id
self.parameters = 'par_file.txt'
- self.events = 'event.txt'
+ self.event = 'event.txt'
self.stations = 'stations.txt'
@@ -722,8 +786,15 @@
outputRootPathname = pyre.str("output-root-pathname")
outputRootUrl = pyre.str("output-root-url")
+ specfemPathname = pyre.str("specfem-pathname", default="pyspecfem3D")
mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
+ remote = pyre.bool("remote", default=False)
+ remoteShellCommand = pyre.str("remote-shell-command")
+ remoteDownloadCommand = pyre.str("remote-download-command")
+ remoteUploadCommand = pyre.str("remote-upload-command")
+ remoteOutputRoot = pyre.str("remote-output-root")
+
dry = pyre.bool("dry", default=False)
@@ -731,7 +802,10 @@
self._info.activate()
self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
clock = Clock(self.sleepInterval / second)
- gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+ if self.remote:
+ gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
+ else:
+ gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
connection.runSimulations(gjm, fjm, self)
More information about the cig-commits
mailing list