[cig-commits] r7757 - in cs: . portal portal/trunk
leif at geodynamics.org
leif at geodynamics.org
Thu Jul 26 16:42:17 PDT 2007
Author: leif
Date: 2007-07-26 16:42:17 -0700 (Thu, 26 Jul 2007)
New Revision: 7757
Added:
cs/portal/
cs/portal/trunk/
cs/portal/trunk/daemon.py
cs/portal/trunk/sleep.rsl
cs/portal/trunk/web-portal-daemon.cfg
Log:
Wrote a new daemon to power the existing SPECFEM web portal. This
daemon uses Globus commands to submit jobs to a remote TeraGrid site.
Currently, it submits dummy jobs which simply run /bin/sleep.
However, the jobs are triggered by the web portal, and the job status
changes are posted back to the portal.
Requires Stackless Python.
Added: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py 2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/daemon.py 2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,321 @@
+
+import stackless
+import os, sys
+from popen2 import Popen4
+
+from pyre.applications import Script
+from pyre.components import Component
+from pyre.units.time import second
+
+
+MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+
+def spawn(info, *argv):
+ command = ' '.join(argv)
+ info.log("spawning: %s" % command)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ statusMsg = "%s: exit %d" % (argv[0], status)
+ info.log(statusMsg)
+ if status != 0:
+ raise Exception(statusMsg)
+ return
+
+
+def ospawn(info, *argv):
+ command = ' '.join(argv)
+ info.log("spawning: %s" % command)
+
+ child = Popen4(argv)
+
+ child.tochild.close()
+
+ output = child.fromchild.readlines()
+ status = child.wait()
+
+ exitStatus = None
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ else:
+ statusStr = "status %d" % status
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+
+ for line in output:
+ info.line(" " + line.rstrip())
+ info.log(statusMsg)
+
+ if exitStatus != 0:
+ raise Exception(statusMsg)
+
+ return output
+
+
+
+class Event(object):
+
+ def __init__(self):
+ self.channel = stackless.channel()
+
+ def wait(self):
+ self.channel.receive()
+ return
+
+ def signal(self):
+
+ # Swap-in a new channel, so that a waiting tasklet that
+ # repeatedly calls wait() will block waiting for the *next*
+ # signal on its subsequent call to wait().
+ channel = self.channel
+ self.channel = stackless.channel()
+
+ # Unblock all waiters.
+ while channel.balance < 0:
+ channel.send(None)
+
+ return
+
+
+class Job(object):
+
+ # status codes
+ STATUS_PENDING = "PENDING"
+ STATUS_ACTIVE = "ACTIVE"
+ STATUS_DONE = "DONE"
+ STATUS_FAILED = "FAILED"
+ statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
+ deadCodes = [STATUS_DONE, STATUS_FAILED]
+
+ #@classmethod
+ def newJob(cls, info):
+ output = ospawn(info, "globusrun", "-F", "-f", "sleep.rsl", "-r", "tg-login.tacc.teragrid.org")
+ id = None
+ for line in output:
+ if line.startswith("https://"):
+ id = line.strip()
+ assert id
+ job = Job(id, info)
+ job.run()
+ return job
+ newJob = classmethod(newJob)
+
+ def __init__(self, id, info):
+ self.id = id
+ self.info = info
+ self.status = None
+ self.statusChanged = Event()
+
+ def getStatus(self):
+ output = ospawn(self.info, "globus-job-status", self.id)
+ oldStatus = self.status
+ self.status = output[0].strip()
+ assert self.status in Job.statusCodes
+ if self.status != oldStatus:
+ self.statusChanged.signal()
+ return self.status
+
+ def isAlive(self):
+ if self.status in Job.deadCodes:
+ return False
+ return not self.getStatus() in Job.deadCodes
+
+ def run(self):
+ stackless.tasklet(self)()
+
+ def __call__(self):
+ while self.isAlive():
+ stackless.schedule()
+ return
+
+
+class Simulation(object):
+
+ # status codes
+ STATUS_NEW = "SimStatusNew"
+ STATUS_PREPARING = "SimStatusPreparing"
+ STATUS_PENDING = "SimStatusPending"
+ STATUS_RUNNING = "SimStatusRunning"
+ STATUS_FINISHING = "SimStatusFinishing"
+ STATUS_DONE = "SimStatusDone"
+ STATUS_ERROR = "SimStatusError"
+
+ def __init__(self, id, daemon, portal):
+ self.id = id
+ self.daemon = daemon
+ self.portal = portal
+
+ def postStatusChange(self, newStatus):
+ import urllib
+ body = urllib.urlencode({'status': newStatus})
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ self.post(body, headers)
+ return
+
+ def postStatusChangeAndUploadOutput(self, newStatus, output):
+ # Based upon a recipe by Wade Leftwich.
+ fields = {'status': newStatus}
+ files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
+ body = self.encodeMultipartFormData(fields, files)
+ headers = {"Content-Type": "multipart/form-data; boundary=%s" % MULTIPART_BOUNDARY,
+ "Content-Length": str(len(body)),
+ "Accept": "text/plain"}
+ self.post(body, headers)
+ return
+
+ def encodeMultipartFormData(self, fields, files):
+ import shutil
+ from StringIO import StringIO
+ stream = StringIO()
+ def line(s=''): stream.write(s + '\r\n')
+ for key, value in fields.iteritems():
+ line('--' + MULTIPART_BOUNDARY)
+ line('Content-Disposition: form-data; name="%s"' % key)
+ line()
+ line(value)
+ for (key, filename, contentType, contentEncoding, content) in files:
+ line('--' + MULTIPART_BOUNDARY)
+ line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
+ line('Content-Type: %s' % contentType)
+ line('Content-Encoding: %s' % contentEncoding)
+ line()
+ shutil.copyfileobj(content, stream)
+ line()
+ line('--' + MULTIPART_BOUNDARY + '--')
+ line()
+ return stream.getvalue()
+
+ def post(self, body, headers):
+ self.daemon._info.log("POST %d" % self.id)
+ import httplib
+ conn = httplib.HTTPConnection(self.portal.host)
+ conn.request("POST", (self.portal.simStatusUrl % self.id), body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ self.daemon._info.log("response %s" % data)
+ return
+
+ def run(self):
+ stackless.tasklet(self)()
+
+ def __call__(self):
+ try:
+ self.postStatusChange(Simulation.STATUS_PREPARING)
+ job = Job.newJob(self.daemon._info)
+ job.statusChanged.wait()
+
+ if job.status == Job.STATUS_PENDING:
+ self.postStatusChange(Simulation.STATUS_PENDING)
+ job.statusChanged.wait()
+
+ if job.status == Job.STATUS_ACTIVE:
+ self.postStatusChange(Simulation.STATUS_RUNNING)
+ job.statusChanged.wait()
+
+ if job.status == Job.STATUS_DONE:
+ #self.postStatusChange(Simulation.STATUS_FINISHING)
+ self.postStatusChange(Simulation.STATUS_DONE)
+ else:
+ assert job.status == Job.STATUS_FAILED, "%s != %s" % (job.status, Job.STATUS_FAILED)
+ self.postStatusChange(Simulation.STATUS_ERROR)
+
+ except Exception, e:
+ self.daemon._info.log("error: %s: %s" % (e.__class__.__name__, e))
+ self.postStatusChange(Simulation.STATUS_ERROR)
+
+ return
+
+
+class Sleeper(object):
+
+ def __init__(self, interval):
+ self.interval = interval
+
+ def __call__(self):
+ from time import sleep
+ while True:
+ sleep(self.interval)
+ stackless.schedule()
+ return
+
+
+class SimulationFactory(object):
+
+ def __init__(self, daemon):
+ self.daemon = daemon
+ self.portal = daemon.portal
+
+ def __call__(self):
+ import urllib2
+
+ simulations = {}
+
+ while True:
+ stackless.schedule()
+
+ self.daemon._info.log("GET %s" % self.portal.simulationsUrl)
+ infile = urllib2.urlopen(self.portal.simulationsUrl)
+ self.daemon._info.log("OK")
+ simulationList = infile.read()
+ infile.close()
+
+ simulationList = eval(simulationList)
+
+ for sim in simulationList:
+ status = sim['status']
+ id = int(sim['id'])
+ if (status == Simulation.STATUS_NEW and
+ not simulations.has_key(id)):
+ self.daemon._info.log("new simulation %d" % id)
+ newSimulation = Simulation(id, self.daemon, self.portal)
+ simulations[id] = newSimulation
+ newSimulation.run()
+
+ return
+
+
+class WebPortal(Component):
+
+ name = "web-portal"
+
+ import pyre.inventory as pyre
+ host = pyre.str("host", default="localhost:8000")
+ urlRoot = pyre.str("url-root", default="/specfem3dglobe/")
+
+ def _configure(self):
+ self.urlPrefix = 'http://%s%s' % (self.host, self.urlRoot)
+ self.simulationsUrl = self.urlPrefix + 'simulations/list.py'
+ self.inputFileUrl = self.urlPrefix + 'simulations/%d/%s'
+ self.simStatusUrl = self.urlRoot + 'simulations/%d/status/'
+
+
+class Daemon(Script):
+
+ name = "web-portal-daemon"
+
+ import pyre.inventory as pyre
+ portal = pyre.facility("portal", factory=WebPortal)
+ sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+
+ def main(self, *args, **kwds):
+ self._info.activate()
+ self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
+ stackless.tasklet(Sleeper(self.sleepInterval / second))()
+ stackless.tasklet(SimulationFactory(self))()
+ try:
+ stackless.run()
+ except KeyboardInterrupt:
+ self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
+ return
+
+
+def main(*args, **kwds):
+ daemon = Daemon()
+ daemon.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+ main()
Added: cs/portal/trunk/sleep.rsl
===================================================================
--- cs/portal/trunk/sleep.rsl 2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/sleep.rsl 2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,6 @@
+&(jobType="single")
+(directory="/home/teragrid/tg456271/sci-gateway")
+(executable=/bin/sleep)
+(arguments="10")
+(stdout="sleep-stdout")
+(stderr="sleep-stderr")
Added: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg 2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/web-portal-daemon.cfg 2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,7 @@
+
+[web-portal-daemon]
+sleep-interval = 5*second
+
+[web-portal-daemon.portal]
+host = crust.geodynamics.org
+
More information about the cig-commits
mailing list