[cig-commits] r7853 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Mon Aug 20 19:57:20 PDT 2007
Author: leif
Date: 2007-08-20 19:57:20 -0700 (Mon, 20 Aug 2007)
New Revision: 7853
Modified:
cs/portal/trunk/daemon.py
Log:
Grab 'nodes' (# processors) from portal. Added the ability to
stage-in files via the daemon by way of the GASS server (useful for
debugging a portal running locally at 'localhost').
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-08-21 02:09:03 UTC (rev 7852)
+++ cs/portal/trunk/daemon.py 2007-08-21 02:57:20 UTC (rev 7853)
@@ -14,6 +14,11 @@
self.value = value
+# I'm not sure what to about this yet.
+TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "/work/teragrid/tg456271") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg456271/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))"""
+
+
+
class GassServer(object):
# @classmethod
@@ -38,9 +43,10 @@
class GlobusJobManager(object):
- def __init__(self, clock, info):
+ def __init__(self, clock, info, downloadInputFiles=False):
self.clock = clock
self.info = info
+ self.downloadInputFiles = downloadInputFiles
def runJob(self, job):
@@ -60,7 +66,9 @@
self.clock.tick.wait()
status = self.getJobStatus(id)
if status != oldStatus:
- job.setStatus(status)
+ # Filter (e.g.) "UNKNOWN JOB STATE 64".
+ if not status.startswith("UNKNOWN "):
+ job.setStatus(status)
oldStatus = status
except Exception, e:
@@ -144,6 +152,7 @@
attribute, valueSequence = relation
valueSequence = self.rslValueSequenceToString(valueSequence)
print >>stream, '(', attribute, '=', valueSequence, ')'
+ print >>stream, TACC_ENVIRONMENT
return
@@ -175,6 +184,9 @@
file_stage_in = []
for url, inputFile in job.inputFileURLs:
+ if self.downloadInputFiles:
+ self.download(url, inputFile)
+ url = gassServer.url + "/./" + inputFile
file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
resSpec["file_stage_in"] = file_stage_in
@@ -191,6 +203,18 @@
return resSpec
+ def download(self, url, pathname):
+ import shutil
+ import urllib2
+ self.info.log("downloading %s" % url)
+ infile = urllib2.urlopen(url)
+ outfile = open(pathname, 'wb')
+ shutil.copyfileobj(infile, outfile)
+ outfile.close()
+ infile.close()
+ return
+
+
class Event(object):
def __init__(self):
@@ -253,8 +277,9 @@
STATUS_ERROR = "SimStatusError"
deadCodes = [STATUS_DONE, STATUS_ERROR]
- def __init__(self, id, info):
+ def __init__(self, id, nodes, info):
self.id = id
+ self.nodes = nodes
self.info = info
self.status = None
self.statusChanged = Event()
@@ -324,7 +349,7 @@
job = Job(
jobType = "single",
count = 1,
- executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
+ executable = "/work/teragrid/tg456271/sf3dgp/xspecfem3D",
arguments = [self.parameters, "--scheduler.dry", "--output-dir=/work/teragrid/tg456271"],
stdout = "build-stdout",
stderr = "build-stderr",
@@ -335,16 +360,16 @@
def newRunJob(self):
from os.path import join
- sf = "/work/teragrid/tg456271/SPECFEM3D_GLOBE"
+ sf = "/work/teragrid/tg456271/sf3dgp"
pythonPath = ':'.join([
sf + "/merlin-1.3.egg",
sf,
sf + "/python/pythia-0.8.1.4-py2.4.egg",
sf + "/python/Cheetah-2.0rc8-py2.4-linux-x86_64.egg",
])
- nodes = 24
simDir = "." # ???
+ simDir = "/work/teragrid/tg456271/portal/%d" % self.id
pyreArgs = [
self.parameters,
@@ -355,7 +380,7 @@
job = Job(
jobType = "mpi",
- count = nodes,
+ count = self.nodes,
max_time = 60,
queue = "normal",
executable = "/work/teragrid/tg456271/mpipyspecfem3D",
@@ -364,8 +389,8 @@
"Specfem3DGlobe==4.0",
"mpi:mpistart",
"Specfem3DGlobe.Specfem:Specfem",
- "--nodes=%d" % nodes,
- "--macros.nodes=%d" % nodes,
+ "--nodes=%d" % self.nodes,
+ "--macros.nodes=%d" % self.nodes,
"--macros.job.name=mysim",
"--macros.job.id=123456",
] + pyreArgs,
@@ -410,10 +435,11 @@
for sim in simulationList:
status = sim['status']
id = int(sim['id'])
+ nodes = int(sim['nodes'])
if (status == Simulation.STATUS_NEW and
not simulations.has_key(id)):
self.info.log("new simulation %d" % id)
- newSimulation = Simulation(id, self.info)
+ newSimulation = Simulation(id, nodes, self.info)
for inputFile in newSimulation.inputFiles:
url = self.inputFileURL(newSimulation, inputFile)
newSimulation.inputFileURLs.append((url, inputFile))
@@ -527,11 +553,20 @@
portal = pyre.facility("portal", factory=WebPortal)
sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+ downloadInputFiles = pyre.bool("download-input-files", default=False)
+ downloadInputFiles.meta['tip'] = (
+"""Download input files from the portal and serve them from the GASS
+server. This is required when debugging a portal which is running
+locally (at 'localhost'). The default is 'False' because Globus can
+stage-in input files directly from a publicly-accessible portal via
+HTTP.""")
+
+
def main(self, *args, **kwds):
self._info.activate()
self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
clock = Clock(self.sleepInterval / second)
- jm = GlobusJobManager(clock, self._info)
+ jm = GlobusJobManager(clock, self._info, self.downloadInputFiles)
connection = PortalConnection(self.portal, clock, self._info)
connection.runSimulations(jm)
try:
More information about the cig-commits
mailing list