[cig-commits] r7777 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Fri Aug 3 20:01:40 PDT 2007
Author: leif
Date: 2007-08-03 20:01:40 -0700 (Fri, 03 Aug 2007)
New Revision: 7777
Removed:
cs/portal/trunk/sleep.rsl
Modified:
cs/portal/trunk/daemon.py
Log:
Start GASS server and stage-out output files.
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-08-03 20:47:03 UTC (rev 7776)
+++ cs/portal/trunk/daemon.py 2007-08-04 03:01:40 UTC (rev 7777)
@@ -14,6 +14,28 @@
self.value = value
+class GassServer(object):
+
+ # @classmethod
+ def startUp(cls):
+ argv = ["globus-gass-server", "-r", "-w", "-c"]
+ child = Popen4(argv)
+ child.tochild.close()
+ url = child.fromchild.readline().strip()
+ return GassServer(child, url)
+ startUp = classmethod(startUp)
+
+ def __init__(self, child, url):
+ self.child = child
+ self.url = url
+
+ def shutDown(self):
+ argv = ["globus-gass-server-shutdown", self.url]
+ os.spawnvp(os.P_NOWAIT, argv[0], argv)
+ status = self.child.wait()
+ return
+
+
class GlobusJobManager(object):
def __init__(self, clock, info):
@@ -26,19 +48,28 @@
def jobRunner(self, job):
+
+ gassServer = GassServer.startUp()
+
try:
- resSpec = self.resSpec(job)
+ resSpec = self.resSpec(job, gassServer)
id = self.globusrun(resSpec)
+
oldStatus = job.status
while job.isAlive():
self.clock.tick.wait()
- status = self.globus_job_status(id)
+ status = self.getJobStatus(id)
if status != oldStatus:
job.setStatus(status)
oldStatus = status
+
except Exception, e:
self.info.log("error: %s: %s" % (e.__class__.__name__, e))
job.setStatus(job.STATUS_FAILED)
+
+ finally:
+ gassServer.shutDown()
+
return
@@ -130,21 +161,33 @@
return ' '.join(s)
- def globus_job_status(self, id):
+ def getJobStatus(self, id):
output = self.ospawn("globus-job-status", id)
status = output[0].strip()
return status
- def resSpec(self, job):
+ def resSpec(self, job, gassServer):
resSpec = {}
resSpec.update(job.resSpec)
resSpec["scratch_dir"] = "/work/teragrid/tg456271"
resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+
file_stage_in = []
for url, inputFile in job.inputFileURLs:
file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
resSpec["file_stage_in"] = file_stage_in
+
+ file_stage_out = []
+ for outputFile in job.outputFiles:
+ url = gassServer.url + "/./" + outputFile
+ file_stage_out.append([outputFile, url])
+ if file_stage_out:
+ resSpec["file_stage_out"] = file_stage_out
+
+ resSpec["stdout"] = gassServer.url + "/./" + resSpec["stdout"]
+ resSpec["stderr"] = gassServer.url + "/./" + resSpec["stderr"]
+
return resSpec
@@ -187,9 +230,10 @@
self.status = None
self.statusChanged = Event()
self.inputFileURLs = []
+ self.outputFiles = []
def setStatus(self, status):
- assert status in self.statusCodes
+ assert status in self.statusCodes, "unknown status: %s" % status
self.status = status
self.statusChanged.signal()
@@ -220,6 +264,7 @@
self.stations = 'stations.txt'
self.inputFiles = [self.parameters, self.events, self.stations]
self.inputFileURLs = []
+ self.outputFiles = ["seismograms.tar.gz"]
def run(self, jm):
stackless.tasklet(self)(jm)
@@ -280,9 +325,9 @@
jobType = "single",
count = 1,
executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
- arguments = ["mysim.cfg", "--scheduler.dry"],
- stdout = "stdout.mysim.test",
- stderr = "stderr.mysim.test",
+ arguments = [self.parameters, "--scheduler.dry", "--output-dir=/work/teragrid/tg456271"],
+ stdout = "build-stdout",
+ stderr = "build-stderr",
)
job.inputFileURLs = self.inputFileURLs
return job
@@ -299,14 +344,13 @@
])
nodes = 24
- simDir = "/work/teragrid/tg456271/simulations/mysim"
+ simDir = "." # ???
pyreArgs = [
self.parameters,
'--output-dir=' + simDir,
'--solver.cmt-solution=' + self.events,
'--solver.stations=' + self.stations,
- '--solver.seismogram-archive=' + join(simDir, 'output.tar.gz'),
]
job = Job(
@@ -314,23 +358,23 @@
count = nodes,
max_time = 60,
queue = "normal",
- executable = "/work/teragrid/tg456271/simulations/mysim/mpipyspecfem3D",
+ executable = "/work/teragrid/tg456271/mpipyspecfem3D",
arguments = ["--pyre-start",
pythonPath,
"Specfem3DGlobe==4.0",
"mpi:mpistart",
"Specfem3DGlobe.Specfem:Specfem",
- "mysim.cfg",
"--nodes=%d" % nodes,
"--macros.nodes=%d" % nodes,
"--macros.job.name=mysim",
"--macros.job.id=123456",
] + pyreArgs,
- stdout = "stdout.mysim.run",
- stderr = "stderr.mysim.run",
+ stdout = "run-stdout",
+ stderr = "run-stderr",
)
job.inputFileURLs = self.inputFileURLs
+ job.outputFiles = self.outputFiles
return job
Deleted: cs/portal/trunk/sleep.rsl
===================================================================
--- cs/portal/trunk/sleep.rsl 2007-08-03 20:47:03 UTC (rev 7776)
+++ cs/portal/trunk/sleep.rsl 2007-08-04 03:01:40 UTC (rev 7777)
@@ -1,6 +0,0 @@
-&(jobType="single")
-(directory="/home/teragrid/tg456271/sci-gateway")
-(executable=/bin/sleep)
-(arguments="10")
-(stdout="sleep-stdout")
-(stderr="sleep-stderr")
More information about the cig-commits
mailing list