[cig-commits] r8335 - cs/portal/trunk/hello
leif at geodynamics.org
leif at geodynamics.org
Mon Nov 26 21:44:24 PST 2007
Author: leif
Date: 2007-11-26 21:44:23 -0800 (Mon, 26 Nov 2007)
New Revision: 8335
Added:
cs/portal/trunk/hello/daemon.py
Log:
Created a stripped-down version of the daemon to power the "hello
world" portal.
Copied: cs/portal/trunk/hello/daemon.py (from rev 8334, cs/portal/trunk/daemon.py)
===================================================================
--- cs/portal/trunk/daemon.py 2007-11-26 20:17:42 UTC (rev 8334)
+++ cs/portal/trunk/hello/daemon.py 2007-11-27 05:44:23 UTC (rev 8335)
@@ -0,0 +1,548 @@
+
+import stackless
+import os, sys, signal
+from popen2 import Popen4
+
+from pyre.applications import Script
+from pyre.components import Component
+from pyre.units.time import second
+
+
+# NYI: RSL AST
+class RSLUnquoted(object):
+ def __init__(self, value):
+ self.value = value
+
+TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
+TG_COMMUNITY = "/projects/tg"
+
+#EXECUTABLE = TG_COMMUNITY + "/CIG/hello/hello"
+EXECUTABLE = "/home/teragrid/tg459131/bin/hello"
+
+# I'm not sure what to do about this yet.
+TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
+
+
+
+class GassServer(object):
+
+ # @classmethod
+ def startUp(cls, directory):
+ argv = ["globus-gass-server", "-r", "-w", "-c"]
+ savedWd = os.getcwd()
+ os.chdir(directory)
+ child = Popen4(argv)
+ os.chdir(savedWd)
+ child.tochild.close()
+ url = child.fromchild.readline().strip()
+ return GassServer(child, url, directory)
+ startUp = classmethod(startUp)
+
+ def __init__(self, child, url, directory):
+ self.child = child
+ self.url = url
+ self.directory = directory
+
+ def shutDown(self):
+ argv = ["globus-gass-server-shutdown", self.url]
+ os.spawnvp(os.P_NOWAIT, argv[0], argv)
+ status = self.child.wait()
+ return
+
+
+class GlobusJobManager(object):
+
+ def __init__(self, clock, root, info):
+ self.clock = clock
+ self.root = root
+ self.info = info
+
+
+ def runJob(self, job):
+ self.createSubdirectoryForJob(job)
+ stackless.tasklet(self.jobRunner)(job)
+
+
+ def spawn(self, *argv):
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+ status = os.spawnvp(os.P_WAIT, argv[0], argv)
+ statusMsg = "%s: exit %d" % (argv[0], status)
+ self.info.log(statusMsg)
+ if status != 0:
+ raise Exception(statusMsg)
+ return
+
+
+ def ospawn(self, *argv):
+ command = ' '.join(argv)
+ self.info.log("spawning: %s" % command)
+
+ child = Popen4(argv)
+
+ child.tochild.close()
+
+ output = child.fromchild.readlines()
+ status = child.wait()
+
+ exitStatus = None
+ if (os.WIFSIGNALED(status)):
+ statusStr = "signal %d" % os.WTERMSIG(status)
+ elif (os.WIFEXITED(status)):
+ exitStatus = os.WEXITSTATUS(status)
+ statusStr = "exit %d" % exitStatus
+ else:
+ statusStr = "status %d" % status
+ statusMsg = "%s: %s" % (argv[0], statusStr)
+
+ for line in output:
+ self.info.line(" " + line.rstrip())
+ self.info.log(statusMsg)
+
+ if exitStatus != 0:
+ raise Exception(statusMsg)
+
+ return output
+
+
+ def download(self, url, pathname):
+ import shutil
+ import urllib2
+ self.info.log("downloading %s" % url)
+ infile = urllib2.urlopen(url)
+ outfile = open(pathname, 'wb')
+ shutil.copyfileobj(infile, outfile)
+ outfile.close()
+ infile.close()
+ return
+
+ def downloadInputFilesForJob(self, job):
+ for inputFile in job.inputFiles:
+ url = job.urlForInputFile(inputFile)
+ filename = inputFile.split('/')[-1] # strips 'mineos/'
+ pathname = os.path.join(job.directory, filename)
+ self.download(url, pathname)
+ return
+
+ def createSubdirectoryForJob(self, job):
+ dirName = "job%05d" % job.id
+ dirPath = os.path.join(self.root, dirName)
+ if not os.path.exists(dirPath):
+ os.mkdir(dirPath)
+ job.subdir = dirName
+ job.directory = dirPath
+ return
+
+
+ def jobRunner(self, job):
+
+ self.downloadInputFilesForJob(job)
+ gassServer = GassServer.startUp(job.directory)
+
+ try:
+ resSpec = self.resSpec(job, gassServer)
+ id = self.globusrun(resSpec)
+
+ oldStatus = job.status
+ ticks = 2
+ while job.isAlive():
+ for t in xrange(ticks):
+ self.clock.tick.wait()
+ ticks *= 2
+ status = self.getJobStatus(id)
+ if status != oldStatus:
+ job.setStatus(status)
+ oldStatus = status
+ ticks = 2
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ job.setStatus(job.STATUS_FAILED)
+
+ finally:
+ gassServer.shutDown()
+
+ return
+
+
+ def globusrun(self, resSpec):
+ import tempfile
+ fd, rslName = tempfile.mkstemp(suffix=".rsl")
+ stream = os.fdopen(fd, "w")
+ try:
+ self.writeRsl(resSpec, stream)
+ stream.close()
+ if resSpec['jobType'] == "mpi":
+ resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
+ else:
+ resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
+ output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
+ id = None
+ for line in output:
+ if line.startswith("https://"):
+ id = line.strip()
+ assert id
+ finally:
+ stream.close()
+ os.unlink(rslName)
+ return id
+
+
+ def writeRsl(self, resSpec, stream):
+ print >>stream, '&'
+ for relation in resSpec.iteritems():
+ attribute, valueSequence = relation
+ valueSequence = self.rslValueSequenceToString(valueSequence)
+ print >>stream, '(', attribute, '=', valueSequence, ')'
+ print >>stream, TACC_ENVIRONMENT
+ return
+
+
+ def rslValueSequenceToString(self, valueSequence):
+ if not isinstance(valueSequence, (tuple, list)):
+ valueSequence = [valueSequence]
+ s = []
+ for value in valueSequence:
+ if isinstance(value, RSLUnquoted):
+ s.append(value.value)
+ elif isinstance(value, (tuple, list)):
+ s.append('(' + self.rslValueSequenceToString(value) + ')')
+ else:
+ s.append('"%s"' % value)
+ return ' '.join(s)
+
+
+ def getJobStatus(self, id):
+ output = self.ospawn("globus-job-status", id)
+ status = output[0].strip()
+ return status
+
+
+ def resSpec(self, job, gassServer):
+ resSpec = {}
+ resSpec.update(job.resSpec)
+ resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
+ resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+
+ file_stage_in = []
+ for inputFile in job.inputFiles:
+ url = gassServer.url + "/./" + inputFile
+ file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
+ if file_stage_in:
+ resSpec["file_stage_in"] = file_stage_in
+
+ file_stage_out = []
+ for outputFile in job.outputFiles:
+ url = gassServer.url + "/./" + outputFile
+ file_stage_out.append([outputFile, url])
+ if file_stage_out:
+ resSpec["file_stage_out"] = file_stage_out
+
+ resSpec["stdout"] = gassServer.url + "/./stdout.txt"
+ resSpec["stderr"] = gassServer.url + "/./stderr.txt"
+
+ job.outputFiles.extend(["stdout.txt", "stderr.txt"])
+
+ return resSpec
+
+
+class Event(object):
+
+ def __init__(self):
+ self.channel = stackless.channel()
+
+ def wait(self):
+ self.channel.receive()
+ return
+
+ def signal(self):
+
+ # Swap-in a new channel, so that a waiting tasklet that
+ # repeatedly calls wait() will block waiting for the *next*
+ # signal on its subsequent call to wait().
+ channel = self.channel
+ self.channel = stackless.channel()
+
+ # Unblock all waiters.
+ while channel.balance < 0:
+ channel.send(None)
+
+ return
+
+
+class Job(object):
+
+ # status codes
+ STATUS_NEW = "NEW" # pseudo
+ STATUS_PENDING = "PENDING"
+ STATUS_ACTIVE = "ACTIVE"
+ STATUS_DONE = "DONE"
+ STATUS_FAILED = "FAILED"
+ statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
+ deadCodes = [STATUS_DONE, STATUS_FAILED]
+
+ def __init__(self, id, task, **resSpec):
+ self.id = id
+ self.task = task
+ self.resSpec = resSpec
+ self.status = self.STATUS_NEW
+ self.statusChanged = Event()
+ self.inputFiles = []
+ self.urlForInputFile = None
+ self.outputFiles = []
+ self.subdir = None
+ self.directory = None
+
+ def setStatus(self, status):
+ if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
+ assert status in self.statusCodes, "unknown status: %s" % status
+ self.status = status
+ self.statusChanged.signal()
+
+ def isAlive(self):
+ return not self.status in self.deadCodes
+
+
+class Simulation(object):
+
+ # status codes
+ STATUS_NEW = "new"
+ STATUS_CONNECTING = "connecting"
+ STATUS_PENDING = "pending"
+ STATUS_RUNNING = "running"
+ STATUS_DONE = "done"
+ STATUS_ERROR = "error"
+ deadCodes = [STATUS_DONE, STATUS_ERROR]
+
+ def __init__(self, id, nodes, urlForInputFile, info):
+ self.id = id
+ self.nodes = nodes
+ self.urlForInputFile = urlForInputFile
+ self.info = info
+
+ self.status = self.STATUS_NEW
+ self.statusChanged = Event()
+
+ def go(self, jm):
+ stackless.tasklet(self)(jm)
+
+ def __call__(self, jm):
+ try:
+ self.setStatus(self.STATUS_CONNECTING)
+
+ # run
+ job = self.newJob()
+ jm.runJob(job)
+
+ while job.isAlive():
+ if job.status == job.STATUS_PENDING:
+ self.setStatus(self.STATUS_PENDING)
+ elif job.status == job.STATUS_ACTIVE:
+ self.setStatus(self.STATUS_RUNNING)
+ job.statusChanged.wait()
+ # while
+
+ self.checkOutputFiles(job)
+
+ if job.status == job.STATUS_DONE:
+ self.setStatus(self.STATUS_DONE)
+ else:
+ raise RuntimeError("run failed")
+ self.setStatus(self.STATUS_ERROR)
+
+ except Exception, e:
+ self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+ self.setStatus(self.STATUS_ERROR)
+
+ return
+
+ def setStatus(self, status):
+ self.status = status
+ self.statusChanged.signal()
+
+ def isAlive(self):
+ return not self.status in self.deadCodes
+
+ def newJob(self):
+ job = Job(
+ self.id,
+ "run",
+ jobType = "single",
+ count = 1,
+ executable = EXECUTABLE,
+ arguments = ["world"]
+ )
+ job.urlForInputFile = self.urlForInputFile
+ return job
+
+ def checkOutputFiles(self, job):
+ for outputFile in job.outputFiles:
+ pathname = os.path.join(job.directory, outputFile)
+ if not os.path.exists(pathname):
+ continue
+
+ # in case the daemon and the web server are run as
+ # different users
+ import stat
+ os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+
+ return
+
+
+class PortalConnection(object):
+
+ MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+ def __init__(self, portal, clock, info):
+ self.portal = portal
+ self.clock = clock
+ self.info = info
+
+ def runSimulations(self, jm):
+ stackless.tasklet(self.simulationFactory)(jm)
+
+ def simulationFactory(self, jm):
+ import urllib2
+
+ simulations = {}
+
+ while True:
+ self.clock.tick.wait()
+
+ #self.info.log("GET %s" % self.portal.simulationsUrl)
+ try:
+ infile = urllib2.urlopen(self.portal.simulationsUrl)
+ except Exception, e:
+ # Could be transient failure -- e.g., "connection reset by peer".
+ self.info.log("error: %s" % e)
+ continue
+ #self.info.log("OK")
+ simulationList = infile.read()
+ infile.close()
+
+ simulationList = eval(simulationList)
+
+ for simulation in simulationList:
+ id = int(simulation['id'])
+ status = simulation['status']
+ nodes = int(simulation['nodes'])
+ if (status in [Simulation.STATUS_NEW, ""] and
+ not simulations.has_key(id)):
+ self.info.log("new simulation %d" % id)
+
+ def urlForInputFile(inputFile):
+ # Map input filenames to URLs in the context
+ # of this simulation.
+ return self.inputFileURL(simulation, inputFile)
+
+ newSimulation = Simulation(id, nodes, urlForInputFile, self.info)
+
+ self.watchSimulation(newSimulation)
+ simulations[id] = newSimulation
+ newSimulation.go(jm)
+
+ return
+
+ def watchSimulation(self, simulation):
+ stackless.tasklet(self.simulationWatcher)(simulation)
+
+ def simulationWatcher(self, simulation):
+ url = self.portal.simulationStatusUrl % simulation.id
+ while simulation.isAlive():
+ simulation.statusChanged.wait()
+ fields = {'status': simulation.status}
+ self.postStatusChange(url, fields)
+ return
+
+ def inputFileURL(self, simulation, inputFile):
+ return self.portal.inputFileUrl % (simulation.id, inputFile)
+
+ def postStatusChange(self, url, fields):
+ import urllib
+ body = urllib.urlencode(fields)
+ headers = {"Content-Type": "application/x-www-form-urlencoded",
+ "Accept": "text/plain"}
+ return self.post(body, headers, url)
+
+ def post(self, body, headers, url):
+ self.info.log("POST %s" % url)
+ import httplib
+ if self.portal.scheme == "http":
+ conn = httplib.HTTPConnection(self.portal.host)
+ elif self.portal.scheme == "https":
+ conn = httplib.HTTPSConnection(self.portal.host)
+ else:
+ assert False # not scheme in ["http", "https"]
+ conn.request("POST", url, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ conn.close()
+ self.info.log("response %s" % data)
+ return data
+
+
+class Clock(object):
+
+ def __init__(self, interval):
+ self.interval = interval
+ self.tick = Event()
+ stackless.tasklet(self)()
+
+ def __call__(self):
+ from time import sleep
+ while True:
+ sleep(self.interval)
+ self.tick.signal()
+ stackless.schedule()
+ return
+
+
+class WebPortal(Component):
+
+ name = "web-portal"
+
+ import pyre.inventory as pyre
+ scheme = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
+ host = pyre.str("host", default="localhost:8000")
+ urlRoot = pyre.str("url-root", default="/hello/")
+
+ def _configure(self):
+ self.urlPrefix = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
+ self.inputFileUrl = self.urlPrefix + 'simulations/%d/%s'
+ # runs
+ self.simulationsUrl = self.urlPrefix + 'simulations/list.py'
+ self.simulationStatusUrl = self.urlRoot + 'simulations/%d/status/'
+
+
+class Daemon(Script):
+
+ name = "web-portal-daemon"
+
+ import pyre.inventory as pyre
+ portal = pyre.facility("portal", factory=WebPortal)
+ sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+
+ outputRootPathname = pyre.str("output-root-pathname")
+
+
+ def main(self, *args, **kwds):
+ self._info.activate()
+ self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
+ clock = Clock(self.sleepInterval / second)
+ jm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+ connection = PortalConnection(self.portal, clock, self._info)
+ connection.runSimulations(jm)
+ try:
+ stackless.run()
+ except KeyboardInterrupt:
+ self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
+ return
+
+
+def main(*args, **kwds):
+ daemon = Daemon()
+ daemon.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+ main()
More information about the cig-commits
mailing list