[cig-commits] r7757 - in cs: . portal portal/trunk

leif at geodynamics.org leif at geodynamics.org
Thu Jul 26 16:42:17 PDT 2007


Author: leif
Date: 2007-07-26 16:42:17 -0700 (Thu, 26 Jul 2007)
New Revision: 7757

Added:
   cs/portal/
   cs/portal/trunk/
   cs/portal/trunk/daemon.py
   cs/portal/trunk/sleep.rsl
   cs/portal/trunk/web-portal-daemon.cfg
Log:
Wrote a new daemon to power the existing SPECFEM web portal.  This
daemon uses Globus commands to submit jobs to a remote TeraGrid site.

Currently, it submits dummy jobs which simply run /bin/sleep.
However, the jobs are triggered by the web portal, and the job status
changes are posted back to the portal.

Requires Stackless Python.


Added: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/daemon.py	2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,321 @@
+
+import stackless
+import os, sys
+from popen2 import Popen4
+
+from pyre.applications import Script
+from pyre.components import Component
+from pyre.units.time import second
+
+
+MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+
+def spawn(info, *argv):
+    command = ' '.join(argv)
+    info.log("spawning: %s" % command)
+    status = os.spawnvp(os.P_WAIT, argv[0], argv)
+    statusMsg = "%s: exit %d" % (argv[0], status)
+    info.log(statusMsg)
+    if status != 0:
+        raise Exception(statusMsg)
+    return
+
+
+def ospawn(info, *argv):
+    command = ' '.join(argv)
+    info.log("spawning: %s" % command)
+
+    child = Popen4(argv)
+
+    child.tochild.close()
+
+    output = child.fromchild.readlines()
+    status = child.wait()
+
+    exitStatus = None
+    if (os.WIFSIGNALED(status)):
+        statusStr = "signal %d" % os.WTERMSIG(status)
+    elif (os.WIFEXITED(status)):
+        exitStatus = os.WEXITSTATUS(status)
+        statusStr = "exit %d" % exitStatus
+    else:
+        statusStr = "status %d" % status
+    statusMsg = "%s: %s" % (argv[0], statusStr)
+    
+    for line in output:
+        info.line("    " + line.rstrip())
+    info.log(statusMsg)
+    
+    if exitStatus != 0:
+        raise Exception(statusMsg)
+
+    return output
+
+
+
+class Event(object):
+    
+    def __init__(self):
+        self.channel = stackless.channel()
+
+    def wait(self):
+        self.channel.receive()
+        return
+
+    def signal(self):
+
+        # Swap-in a new channel, so that a waiting tasklet that
+        # repeatedly calls wait() will block waiting for the *next*
+        # signal on its subsequent call to wait().
+        channel = self.channel
+        self.channel = stackless.channel()
+
+        # Unblock all waiters.
+        while channel.balance < 0:
+            channel.send(None)
+        
+        return
+
+
+class Job(object):
+
+    # status codes
+    STATUS_PENDING  = "PENDING"
+    STATUS_ACTIVE   = "ACTIVE"
+    STATUS_DONE     = "DONE"
+    STATUS_FAILED   = "FAILED"
+    statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
+    deadCodes = [STATUS_DONE, STATUS_FAILED]
+
+    #@classmethod
+    def newJob(cls, info):
+        output = ospawn(info, "globusrun", "-F", "-f", "sleep.rsl", "-r", "tg-login.tacc.teragrid.org")
+        id = None
+        for line in output:
+            if line.startswith("https://"):
+                id = line.strip()
+        assert id
+        job = Job(id, info)
+        job.run()
+        return job
+    newJob = classmethod(newJob)
+        
+    def __init__(self, id, info):
+        self.id = id
+        self.info = info
+        self.status = None
+        self.statusChanged = Event()
+
+    def getStatus(self):
+        output = ospawn(self.info, "globus-job-status", self.id)
+        oldStatus = self.status
+        self.status = output[0].strip()
+        assert self.status in Job.statusCodes
+        if self.status != oldStatus:
+            self.statusChanged.signal()
+        return self.status
+
+    def isAlive(self):
+        if self.status in Job.deadCodes:
+            return False
+        return not self.getStatus() in Job.deadCodes
+
+    def run(self):
+        stackless.tasklet(self)()
+
+    def __call__(self):
+        while self.isAlive():
+            stackless.schedule()
+        return
+
+
+class Simulation(object):
+    
+    # status codes
+    STATUS_NEW        = "SimStatusNew"
+    STATUS_PREPARING  = "SimStatusPreparing"
+    STATUS_PENDING    = "SimStatusPending"
+    STATUS_RUNNING    = "SimStatusRunning"
+    STATUS_FINISHING  = "SimStatusFinishing"
+    STATUS_DONE       = "SimStatusDone"
+    STATUS_ERROR      = "SimStatusError"
+
+    def __init__(self, id, daemon, portal):
+        self.id = id
+        self.daemon = daemon
+        self.portal = portal
+
+    def postStatusChange(self, newStatus):
+        import urllib
+        body = urllib.urlencode({'status': newStatus})
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        self.post(body, headers)
+        return
+
+    def postStatusChangeAndUploadOutput(self, newStatus, output):
+        # Based upon a recipe by Wade Leftwich.
+        fields = {'status': newStatus}
+        files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
+        body = self.encodeMultipartFormData(fields, files)
+        headers = {"Content-Type": "multipart/form-data; boundary=%s" % MULTIPART_BOUNDARY,
+                   "Content-Length": str(len(body)),
+                   "Accept": "text/plain"}
+        self.post(body, headers)
+        return
+
+    def encodeMultipartFormData(self, fields, files):
+        import shutil
+        from StringIO import StringIO
+        stream = StringIO()
+        def line(s=''): stream.write(s + '\r\n')
+        for key, value in fields.iteritems():
+            line('--' + MULTIPART_BOUNDARY)
+            line('Content-Disposition: form-data; name="%s"' % key)
+            line()
+            line(value)
+        for (key, filename, contentType, contentEncoding, content) in files:
+            line('--' + MULTIPART_BOUNDARY)
+            line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
+            line('Content-Type: %s' % contentType)
+            line('Content-Encoding: %s' % contentEncoding)
+            line()
+            shutil.copyfileobj(content, stream)
+            line()
+        line('--' + MULTIPART_BOUNDARY + '--')
+        line()
+        return stream.getvalue()
+
+    def post(self, body, headers):
+        self.daemon._info.log("POST %d" % self.id)
+        import httplib
+        conn = httplib.HTTPConnection(self.portal.host)
+        conn.request("POST", (self.portal.simStatusUrl % self.id), body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        self.daemon._info.log("response %s" % data)
+        return
+
+    def run(self):
+        stackless.tasklet(self)()
+
+    def __call__(self):
+        try:
+            self.postStatusChange(Simulation.STATUS_PREPARING)
+            job = Job.newJob(self.daemon._info)
+            job.statusChanged.wait()
+            
+            if job.status == Job.STATUS_PENDING:
+                self.postStatusChange(Simulation.STATUS_PENDING)
+                job.statusChanged.wait()
+            
+            if job.status == Job.STATUS_ACTIVE:
+                self.postStatusChange(Simulation.STATUS_RUNNING)
+                job.statusChanged.wait()
+            
+            if job.status == Job.STATUS_DONE:
+                #self.postStatusChange(Simulation.STATUS_FINISHING)
+                self.postStatusChange(Simulation.STATUS_DONE)
+            else:
+                assert job.status == Job.STATUS_FAILED, "%s != %s" % (job.status, Job.STATUS_FAILED)
+                self.postStatusChange(Simulation.STATUS_ERROR)
+                
+        except Exception, e:
+            self.daemon._info.log("error: %s: %s" % (e.__class__.__name__, e))
+            self.postStatusChange(Simulation.STATUS_ERROR)
+        
+        return
+
+
+class Sleeper(object):
+
+    def __init__(self, interval):
+        self.interval = interval
+
+    def __call__(self):
+        from time import sleep
+        while True:
+            sleep(self.interval)
+            stackless.schedule()
+        return
+
+
+class SimulationFactory(object):
+
+    def __init__(self, daemon):
+        self.daemon = daemon
+        self.portal = daemon.portal
+
+    def __call__(self):
+        import urllib2
+        
+        simulations = {}
+        
+        while True:
+            stackless.schedule()
+            
+            self.daemon._info.log("GET %s" % self.portal.simulationsUrl)
+            infile = urllib2.urlopen(self.portal.simulationsUrl)
+            self.daemon._info.log("OK")
+            simulationList = infile.read()
+            infile.close()
+            
+            simulationList = eval(simulationList)
+            
+            for sim in simulationList:
+                status = sim['status']
+                id = int(sim['id'])
+                if (status == Simulation.STATUS_NEW and
+                    not simulations.has_key(id)):
+                    self.daemon._info.log("new simulation %d" % id)
+                    newSimulation = Simulation(id, self.daemon, self.portal)
+                    simulations[id] = newSimulation
+                    newSimulation.run()
+        
+        return
+
+
+class WebPortal(Component):
+    
+    name = "web-portal"
+
+    import pyre.inventory as pyre
+    host     = pyre.str("host", default="localhost:8000")
+    urlRoot  = pyre.str("url-root", default="/specfem3dglobe/")
+
+    def _configure(self):
+        self.urlPrefix           = 'http://%s%s' % (self.host, self.urlRoot)
+        self.simulationsUrl      = self.urlPrefix + 'simulations/list.py'
+        self.inputFileUrl        = self.urlPrefix + 'simulations/%d/%s'
+        self.simStatusUrl        = self.urlRoot + 'simulations/%d/status/'
+
+
+class Daemon(Script):
+
+    name = "web-portal-daemon"
+
+    import pyre.inventory as pyre
+    portal = pyre.facility("portal", factory=WebPortal)
+    sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+
+    def main(self, *args, **kwds):
+        self._info.activate()
+        self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
+        stackless.tasklet(Sleeper(self.sleepInterval / second))()
+        stackless.tasklet(SimulationFactory(self))()
+        try:
+            stackless.run()
+        except KeyboardInterrupt:
+            self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
+        return
+
+
+def main(*args, **kwds):
+    daemon = Daemon()
+    daemon.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+    main()

Added: cs/portal/trunk/sleep.rsl
===================================================================
--- cs/portal/trunk/sleep.rsl	2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/sleep.rsl	2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,6 @@
+&(jobType="single")
+(directory="/home/teragrid/tg456271/sci-gateway")
+(executable=/bin/sleep)
+(arguments="10")
+(stdout="sleep-stdout")
+(stderr="sleep-stderr")

Added: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg	2007-07-26 22:58:03 UTC (rev 7756)
+++ cs/portal/trunk/web-portal-daemon.cfg	2007-07-26 23:42:17 UTC (rev 7757)
@@ -0,0 +1,7 @@
+
+[web-portal-daemon]
+sleep-interval = 5*second
+
+[web-portal-daemon.portal]
+host = crust.geodynamics.org
+



More information about the cig-commits mailing list