[cig-commits] r7770 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Wed Aug 1 18:10:21 PDT 2007
Author: leif
Date: 2007-08-01 18:10:21 -0700 (Wed, 01 Aug 2007)
New Revision: 7770
Modified:
cs/portal/trunk/daemon.py
Log:
Stage-in input files.
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-08-01 21:20:44 UTC (rev 7769)
+++ cs/portal/trunk/daemon.py 2007-08-02 01:10:21 UTC (rev 7770)
@@ -8,6 +8,12 @@
from pyre.units.time import second
+# NYI: RSL AST
+class RSLUnquoted(object):
+ def __init__(self, value):
+ self.value = value
+
+
class GlobusJobManager(object):
def __init__(self, clock, info):
@@ -21,8 +27,8 @@
def jobRunner(self, job):
try:
- # (file_stage_in = ($(GLOBUSRUN_GASS_URL)/./tomato $(SCRATCH_DIRECTORY)/input))
- id = self.globusrun(job.resSpec)
+ resSpec = self.resSpec(job)
+ id = self.globusrun(resSpec)
oldStatus = job.status
while job.isAlive():
self.clock.tick.wait()
@@ -32,7 +38,7 @@
oldStatus = status
except Exception, e:
self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- job.setStatus(job.STATUS_ERROR)
+ job.setStatus(job.STATUS_FAILED)
return
@@ -88,7 +94,7 @@
if resSpec['jobType'] == "mpi":
resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
else:
- resourceManager = "tg-login.tacc.teragrid.org"
+ resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
id = None
for line in output:
@@ -105,19 +111,43 @@
print >>stream, '&'
for relation in resSpec.iteritems():
attribute, valueSequence = relation
- if not isinstance(valueSequence, (tuple, list)):
- valueSequence = [valueSequence]
- valueSequence = ['"%s"' % value for value in valueSequence]
- print >>stream, '(', attribute, '=', ' '.join(valueSequence), ')'
+ valueSequence = self.rslValueSequenceToString(valueSequence)
+ print >>stream, '(', attribute, '=', valueSequence, ')'
return
+ def rslValueSequenceToString(self, valueSequence):
+ if not isinstance(valueSequence, (tuple, list)):
+ valueSequence = [valueSequence]
+ s = []
+ for value in valueSequence:
+ if isinstance(value, RSLUnquoted):
+ s.append(value.value)
+ elif isinstance(value, (tuple, list)):
+ s.append('(' + self.rslValueSequenceToString(value) + ')')
+ else:
+ s.append('"%s"' % value)
+ return ' '.join(s)
+
+
def globus_job_status(self, id):
output = self.ospawn("globus-job-status", id)
status = output[0].strip()
return status
+ def resSpec(self, job):
+ resSpec = {}
+ resSpec.update(job.resSpec)
+ resSpec["scratch_dir"] = "/work/teragrid/tg456271"
+ resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+ file_stage_in = []
+ for url, inputFile in job.inputFileURLs:
+ file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
+ resSpec["file_stage_in"] = file_stage_in
+ return resSpec
+
+
class Event(object):
def __init__(self):
@@ -156,7 +186,7 @@
self.resSpec = resSpec
self.status = None
self.statusChanged = Event()
- self.inputFiles = []
+ self.inputFileURLs = []
def setStatus(self, status):
assert status in self.statusCodes
@@ -166,10 +196,7 @@
def isAlive(self):
return not self.status in self.deadCodes
- def stageIn(self, filename):
- self.inputFiles.append(filename)
-
class Simulation(object):
# status codes
@@ -188,6 +215,12 @@
self.status = None
self.statusChanged = Event()
+ self.parameters = 'parameters.pml'
+ self.events = 'events.txt'
+ self.stations = 'stations.txt'
+ self.inputFiles = [self.parameters, self.events, self.stations]
+ self.inputFileURLs = []
+
def run(self, jm):
stackless.tasklet(self)(jm)
@@ -243,15 +276,16 @@
return not self.status in self.deadCodes
def newBuildJob(self):
- return Job(
+ job = Job(
jobType = "single",
count = 1,
- directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
arguments = ["mysim.cfg", "--scheduler.dry"],
stdout = "stdout.mysim.test",
stderr = "stderr.mysim.test",
)
+ job.inputFileURLs = self.inputFileURLs
+ return job
def newRunJob(self):
from os.path import join
@@ -265,18 +299,13 @@
])
nodes = 24
- parameters = 'parameters.pml'
- events = 'events.txt'
- stations = 'stations.txt'
- inputFiles = [parameters, events, stations]
-
simDir = "/work/teragrid/tg456271/simulations/mysim"
pyreArgs = [
- parameters,
+ self.parameters,
'--output-dir=' + simDir,
- '--solver.cmt-solution=' + events,
- '--solver.stations=' + stations,
+ '--solver.cmt-solution=' + self.events,
+ '--solver.stations=' + self.stations,
'--solver.seismogram-archive=' + join(simDir, 'output.tar.gz'),
]
@@ -285,7 +314,6 @@
count = nodes,
max_time = 60,
queue = "normal",
- directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
executable = "/work/teragrid/tg456271/simulations/mysim/mpipyspecfem3D",
arguments = ["--pyre-start",
pythonPath,
@@ -302,8 +330,7 @@
stderr = "stderr.mysim.run",
)
- for inputFile in inputFiles:
- job.stageIn(inputFile)
+ job.inputFileURLs = self.inputFileURLs
return job
@@ -343,6 +370,9 @@
not simulations.has_key(id)):
self.info.log("new simulation %d" % id)
newSimulation = Simulation(id, self.info)
+ for inputFile in newSimulation.inputFiles:
+ url = self.inputFileURL(newSimulation, inputFile)
+ newSimulation.inputFileURLs.append((url, inputFile))
self.watchSimulation(newSimulation)
simulations[id] = newSimulation
newSimulation.run(jm)
More information about the cig-commits
mailing list