[cig-commits] r7914 - cs/portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Wed Aug 29 19:50:16 PDT 2007
Author: leif
Date: 2007-08-29 19:50:15 -0700 (Wed, 29 Aug 2007)
New Revision: 7914
Modified:
cs/portal/trunk/daemon.py
cs/portal/trunk/web-portal-daemon.cfg
Log:
Enhancements corresponding to r1742 & r1749 in the other repository:
Introduced 'Run', 'Job', and 'OutputFile'. 'Run' status takes the
place of 'Simulation' status; there can be muliple runs per
simulation. 'Job' provides web users with detailed job info; there
can be multiple jobs per run: 'build', 'run', and -- maybe someday --
'postprocessing'. With the addition of the generic 'OutputFile'
mechanism, 'stdout' and 'stderr' are now accessible via the web portal
(plus whatever else the daemon decides to produce in addition to the
seismogram tarball -- perhaps output_{mesher,solver}.txt).
The daemon now creates an output directory for each job and puts the
job output files there. The job directories are created under a root
directory which is presumed to be served by a web server at the URL
indicated in the .cfg file. The daemon randomly generates the
filenames of job directories, creating a modicum of security/privacy
through obscurity (it is assumed that directory-indexes are not
allowed on the root directory/URL).
Updated the example .cfg file with the settings I've been using for
debugging.
Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-08-29 23:50:51 UTC (rev 7913)
+++ cs/portal/trunk/daemon.py 2007-08-30 02:50:15 UTC (rev 7914)
@@ -22,17 +22,21 @@
class GassServer(object):
# @classmethod
- def startUp(cls):
+ def startUp(cls, directory):
argv = ["globus-gass-server", "-r", "-w", "-c"]
+ savedWd = os.getcwd()
+ os.chdir(directory)
child = Popen4(argv)
+ os.chdir(savedWd)
child.tochild.close()
url = child.fromchild.readline().strip()
- return GassServer(child, url)
+ return GassServer(child, url, directory)
startUp = classmethod(startUp)
- def __init__(self, child, url):
+ def __init__(self, child, url, directory):
self.child = child
self.url = url
+ self.directory = directory
def shutDown(self):
argv = ["globus-gass-server-shutdown", self.url]
@@ -43,20 +47,22 @@
class GlobusJobManager(object):
- def __init__(self, clock, info, downloadInputFiles=False):
+ def __init__(self, clock, root, info, downloadInputFiles=False):
self.clock = clock
+ self.root = root
self.info = info
self.downloadInputFiles = downloadInputFiles
def runJob(self, job):
+ self.createSubdirectoryForJob(job)
stackless.tasklet(self.jobRunner)(job)
def jobRunner(self, job):
+
+ gassServer = GassServer.startUp(job.directory)
- gassServer = GassServer.startUp()
-
try:
resSpec = self.resSpec(job, gassServer)
id = self.globusrun(resSpec)
@@ -66,9 +72,7 @@
self.clock.tick.wait()
status = self.getJobStatus(id)
if status != oldStatus:
- # Filter (e.g.) "UNKNOWN JOB STATE 64".
- if not status.startswith("UNKNOWN "):
- job.setStatus(status)
+ job.setStatus(status)
oldStatus = status
except Exception, e:
@@ -185,7 +189,8 @@
file_stage_in = []
for url, inputFile in job.inputFileURLs:
if self.downloadInputFiles:
- self.download(url, inputFile)
+ pathname = os.path.join(job.directory, inputFile)
+ self.download(url, pathname)
url = gassServer.url + "/./" + inputFile
file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
resSpec["file_stage_in"] = file_stage_in
@@ -197,8 +202,10 @@
if file_stage_out:
resSpec["file_stage_out"] = file_stage_out
- resSpec["stdout"] = gassServer.url + "/./" + resSpec["stdout"]
- resSpec["stderr"] = gassServer.url + "/./" + resSpec["stderr"]
+ resSpec["stdout"] = gassServer.url + "/./stdout.txt"
+ resSpec["stderr"] = gassServer.url + "/./stderr.txt"
+
+ job.outputFiles.extend(["stdout.txt", "stderr.txt"])
return resSpec
@@ -215,6 +222,18 @@
return
+ def createSubdirectoryForJob(self, job):
+ while True:
+ dirName = randomName()
+ dirPath = os.path.join(self.root, dirName)
+ if not os.path.exists(dirPath):
+ os.mkdir(dirPath)
+ break
+ job.subdir = dirName
+ job.directory = dirPath
+ return
+
+
class Event(object):
def __init__(self):
@@ -242,6 +261,7 @@
class Job(object):
# status codes
+ STATUS_NEW = "NEW" # pseudo
STATUS_PENDING = "PENDING"
STATUS_ACTIVE = "ACTIVE"
STATUS_DONE = "DONE"
@@ -249,15 +269,19 @@
statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
deadCodes = [STATUS_DONE, STATUS_FAILED]
- def __init__(self, **resSpec):
+ def __init__(self, task, **resSpec):
+ self.task = task
self.resSpec = resSpec
- self.status = None
+ self.status = self.STATUS_NEW
self.statusChanged = Event()
self.inputFileURLs = []
self.outputFiles = []
+ self.subdir = None
+ self.directory = None
def setStatus(self, status):
- assert status in self.statusCodes, "unknown status: %s" % status
+ if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
+ assert status in self.statusCodes, "unknown status: %s" % status
self.status = status
self.statusChanged.signal()
@@ -265,42 +289,41 @@
return not self.status in self.deadCodes
-class Simulation(object):
+class Run(object):
# status codes
- STATUS_NEW = "SimStatusNew"
- STATUS_PREPARING = "SimStatusPreparing"
- STATUS_PENDING = "SimStatusPending"
- STATUS_RUNNING = "SimStatusRunning"
- STATUS_FINISHING = "SimStatusFinishing"
- STATUS_DONE = "SimStatusDone"
- STATUS_ERROR = "SimStatusError"
+ STATUS_NEW = "new"
+ STATUS_PREPARING = "preparing"
+ STATUS_PENDING = "pending"
+ STATUS_RUNNING = "running"
+ #STATUS_FINISHING = "finishing"
+ STATUS_DONE = "done"
+ STATUS_ERROR = "error"
deadCodes = [STATUS_DONE, STATUS_ERROR]
- def __init__(self, id, nodes, info):
+ def __init__(self, id, simulation, info):
self.id = id
- self.nodes = nodes
+ self.simulation = simulation
self.info = info
- self.status = None
+
+ self.job = None
+ self.status = self.STATUS_NEW
self.statusChanged = Event()
- self.parameters = 'parameters.pml'
- self.events = 'events.txt'
- self.stations = 'stations.txt'
- self.inputFiles = [self.parameters, self.events, self.stations]
self.inputFileURLs = []
self.outputFiles = ["seismograms.tar.gz"]
- def run(self, jm):
+ def go(self, jm):
stackless.tasklet(self)(jm)
def __call__(self, jm):
try:
- self.setStatus(self.STATUS_PREPARING)
-
# build
job = self.newBuildJob()
+ self.job = job
jm.runJob(job)
+
+ self.setStatus(self.STATUS_PREPARING)
while job.isAlive():
job.statusChanged.wait()
@@ -310,26 +333,21 @@
# run
job = self.newRunJob()
+ self.job = job
jm.runJob(job)
- job.statusChanged.wait()
- if job.status == job.STATUS_PENDING:
- self.setStatus(self.STATUS_PENDING)
+ self.setStatus(self.STATUS_PENDING)
+ while job.isAlive():
job.statusChanged.wait()
-
- if job.status == job.STATUS_ACTIVE:
- self.setStatus(self.STATUS_RUNNING)
- job.statusChanged.wait()
-
- if job.status == job.STATUS_DONE:
- self.setStatus(self.STATUS_FINISHING)
- else:
- assert job.status == job.STATUS_FAILED, "%s != %s" % (job.status, job.STATUS_FAILED)
+ if job.status == job.STATUS_ACTIVE:
+ self.setStatus(self.STATUS_RUNNING)
+ # while
+
+ if job.status == job.STATUS_FAILED:
self.setStatus(self.STATUS_ERROR)
raise RuntimeError("run failed")
- # transfer files
-
+ self.job = None
self.setStatus(self.STATUS_DONE)
except Exception, e:
@@ -348,12 +366,11 @@
def newBuildJob(self):
self.buildDir = "/work/teragrid/tg456271/portal/%d" % self.id
job = Job(
+ "build",
jobType = "single",
count = 1,
executable = "/work/teragrid/tg456271/sf3dgp/xspecfem3D",
- arguments = [self.parameters, "--scheduler.dry", "--output-dir=" + self.buildDir],
- stdout = "build-stdout",
- stderr = "build-stderr",
+ arguments = [self.simulation.parameters, "--scheduler.dry", "--output-dir=" + self.buildDir],
)
job.inputFileURLs = self.inputFileURLs
return job
@@ -373,16 +390,17 @@
#simDir = "/work/teragrid/tg456271/portal/%d" % self.id
pyreArgs = [
- self.parameters,
+ self.simulation.parameters,
'--output-dir=' + simDir,
- '--solver.cmt-solution=' + self.events,
- '--solver.stations=' + self.stations,
+ '--solver.cmt-solution=' + self.simulation.events,
+ '--solver.stations=' + self.simulation.stations,
]
job = Job(
+ "run",
jobType = "mpi",
- count = self.nodes,
- max_time = 60,
+ count = self.simulation.nodes,
+ maxTime = 12*60,
queue = "normal",
executable = self.buildDir + "/mpipyspecfem3D",
arguments = ["--pyre-start",
@@ -390,13 +408,11 @@
"Specfem3DGlobe==4.0",
"mpi:mpistart",
"Specfem3DGlobe.Specfem:Specfem",
- "--nodes=%d" % self.nodes,
- "--macros.nodes=%d" % self.nodes,
+ "--nodes=%d" % self.simulation.nodes,
+ "--macros.nodes=%d" % self.simulation.nodes,
"--macros.job.name=mysim",
- "--macros.job.id=123456",
+ "--macros.job.id=%d" % self.id,
] + pyreArgs,
- stdout = "run-stdout",
- stderr = "run-stderr",
)
job.inputFileURLs = self.inputFileURLs
@@ -405,81 +421,145 @@
return job
+class Simulation(object):
+ def __init__(self, id, nodes):
+ self.id = id
+ self.nodes = nodes
+ self.parameters = 'parameters.pml'
+ self.events = 'events.txt'
+ self.stations = 'stations.txt'
+ self.inputFiles = [self.parameters, self.events, self.stations]
+
+
class PortalConnection(object):
MULTIPART_BOUNDARY = '----------eArThQuAkE$'
- def __init__(self, portal, clock, info):
+ def __init__(self, portal, outputRootUrl, clock, info):
self.portal = portal
+ self.outputRootUrl = outputRootUrl
self.clock = clock
self.info = info
def runSimulations(self, jm):
- stackless.tasklet(self.simulationFactory)(jm)
+ stackless.tasklet(self.runFactory)(jm)
- def simulationFactory(self, jm):
+ def runFactory(self, jm):
import urllib2
- simulations = {}
+ runs = {}
while True:
self.clock.tick.wait()
- self.info.log("GET %s" % self.portal.simulationsUrl)
- infile = urllib2.urlopen(self.portal.simulationsUrl)
+ self.info.log("GET %s" % self.portal.runsUrl)
+ infile = urllib2.urlopen(self.portal.runsUrl)
self.info.log("OK")
- simulationList = infile.read()
+ runList = infile.read()
infile.close()
- simulationList = eval(simulationList)
+ runList = eval(runList)
- for sim in simulationList:
- status = sim['status']
- id = int(sim['id'])
- nodes = int(sim['nodes'])
- if (status == Simulation.STATUS_NEW and
- not simulations.has_key(id)):
- self.info.log("new simulation %d" % id)
- newSimulation = Simulation(id, nodes, self.info)
- for inputFile in newSimulation.inputFiles:
- url = self.inputFileURL(newSimulation, inputFile)
- newSimulation.inputFileURLs.append((url, inputFile))
- self.watchSimulation(newSimulation)
- simulations[id] = newSimulation
- newSimulation.run(jm)
+ for run in runList:
+ id = int(run['id'])
+ status = run['status']
+ simId = run['simulation']
+ nodes = int(run['nodes'])
+ if (status in [Run.STATUS_NEW, ""] and
+ not runs.has_key(id)):
+ self.info.log("new run %d" % id)
+ simulation = Simulation(simId, nodes)
+ newRun = Run(id, simulation, self.info)
+ for inputFile in simulation.inputFiles:
+ url = self.inputFileURL(simulation, inputFile)
+ newRun.inputFileURLs.append((url, inputFile))
+ self.watchRun(newRun)
+ runs[id] = newRun
+ newRun.go(jm)
return
- def watchSimulation(self, simulation):
- stackless.tasklet(self.simulationWatcher)(simulation)
+ def watchRun(self, run):
+ stackless.tasklet(self.runWatcher)(run)
- def simulationWatcher(self, simulation):
- while simulation.isAlive():
- simulation.statusChanged.wait()
- self.postStatusChange(simulation)
+ def runWatcher(self, run):
+ url = self.portal.runStatusUrl % run.id
+ job = run.job
+ while run.isAlive():
+ run.statusChanged.wait()
+ fields = {'status': run.status}
+ self.postStatusChange(url, fields)
+ if run.job != job:
+ job = run.job
+ if job is not None:
+ self.watchJob(job, run)
return
+ def watchJob(self, job, run):
+ url = self.portal.jobCreateUrl
+ fields = self.jobFields(job, run)
+ response = self.postStatusChange(url, fields)
+ portalId = eval(response)
+ stackless.tasklet(self.jobWatcher)(job, portalId, run)
+
+ def jobWatcher(self, job, portalId, run):
+ url = self.portal.jobUpdateUrl % portalId
+ while job.isAlive():
+ job.statusChanged.wait()
+ fields = self.jobFields(job, run)
+ self.postStatusChange(url, fields)
+ self.postJobOutput(job, portalId)
+ return
+
+ def postJobOutput(self, job, portalId):
+ url = self.portal.outputCreateUrl
+
+ for outputFile in job.outputFiles:
+
+ pathname = os.path.join(job.directory, outputFile)
+ if not os.path.exists(pathname):
+ continue
+
+ # in case the daemon and the web server are run as
+ # different users
+ import stat
+ os.chmod(pathname, stat.S_IRGRP | stat.S_IROTH)
+
+ fields = {
+ 'job': portalId,
+ 'name': outputFile,
+ }
+ self.postStatusChange(url, fields)
+
+ return
+
+ def jobFields(self, job, run):
+ fields = {
+ 'run': run.id,
+ 'task': job.task,
+ 'status': job.status.lower(),
+ 'url': self.outputRootUrl + job.subdir + "/",
+ }
+ return fields
+
def inputFileURL(self, simulation, inputFile):
return self.portal.inputFileUrl % (simulation.id, inputFile)
- def postStatusChange(self, simulation):
+ def postStatusChange(self, url, fields):
import urllib
- body = urllib.urlencode({'status': simulation.status})
+ body = urllib.urlencode(fields)
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Accept": "text/plain"}
- self.post(body, headers, simulation)
- return
+ return self.post(body, headers, url)
- def postStatusChangeAndUploadOutput(self, simulation, output):
+ def postStatusChangeAndUploadOutput(self, url, fields, output):
# Based upon a recipe by Wade Leftwich.
- fields = {'status': simulation.status}
files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
body = self.encodeMultipartFormData(fields, files)
headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
"Content-Length": str(len(body)),
"Accept": "text/plain"}
- self.post(body, headers, simulation)
- return
+ return self.post(body, headers, url)
def encodeMultipartFormData(self, fields, files):
import shutil
@@ -503,16 +583,16 @@
line()
return stream.getvalue()
- def post(self, body, headers, simulation):
- self.info.log("POST %d" % simulation.id)
+ def post(self, body, headers, url):
+ self.info.log("POST %s" % url)
import httplib
conn = httplib.HTTPConnection(self.portal.host)
- conn.request("POST", (self.portal.simStatusUrl % simulation.id), body, headers)
+ conn.request("POST", url, body, headers)
response = conn.getresponse()
data = response.read()
conn.close()
self.info.log("response %s" % data)
- return
+ return data
class Clock(object):
@@ -541,9 +621,15 @@
def _configure(self):
self.urlPrefix = 'http://%s%s' % (self.host, self.urlRoot)
- self.simulationsUrl = self.urlPrefix + 'simulations/list.py'
self.inputFileUrl = self.urlPrefix + 'simulations/%d/%s'
- self.simStatusUrl = self.urlRoot + 'simulations/%d/status/'
+ # runs
+ self.runsUrl = self.urlPrefix + 'runs/list.py'
+ self.runStatusUrl = self.urlRoot + 'runs/%d/status/'
+ # jobs
+ self.jobCreateUrl = self.urlRoot + 'jobs/create/'
+ self.jobUpdateUrl = self.urlRoot + 'jobs/%d/update/'
+ # output
+ self.outputCreateUrl = self.urlRoot + 'output/create/'
class Daemon(Script):
@@ -562,13 +648,16 @@
stage-in input files directly from a publicly-accessible portal via
HTTP.""")
+ outputRootPathname = pyre.str("output-root-pathname")
+ outputRootUrl = pyre.str("output-root-url")
+
def main(self, *args, **kwds):
self._info.activate()
self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
clock = Clock(self.sleepInterval / second)
- jm = GlobusJobManager(clock, self._info, self.downloadInputFiles)
- connection = PortalConnection(self.portal, clock, self._info)
+ jm = GlobusJobManager(clock, self.outputRootPathname, self._info, self.downloadInputFiles)
+ connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
connection.runSimulations(jm)
try:
stackless.run()
@@ -577,6 +666,19 @@
return
+def randomName():
+ # Stolen from pyre.services.Pickler; perhaps this should be an
+ # official/"exported" Pyre function?
+
+ alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
+
+ import random
+ random.shuffle(alphabet)
+ key = "".join(alphabet)[0:16]
+
+ return key
+
+
def main(*args, **kwds):
daemon = Daemon()
daemon.run(*args, **kwds)
Modified: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg 2007-08-29 23:50:51 UTC (rev 7913)
+++ cs/portal/trunk/web-portal-daemon.cfg 2007-08-30 02:50:15 UTC (rev 7914)
@@ -1,7 +1,12 @@
[web-portal-daemon]
sleep-interval = 5*second
+download-input-files = True
+output-root-pathname = /home/leif/public_html/portal/
+output-root-url = http://crust.geodynamics.org/~leif/portal/
[web-portal-daemon.portal]
-host = crust.geodynamics.org
+#host = crust.geodynamics.org
+host = localhost:8000
+
More information about the cig-commits
mailing list