[cig-commits] r8898 - cs/portal/trunk/magportal
wei at geodynamics.org
wei at geodynamics.org
Mon Dec 17 21:05:06 PST 2007
Author: wei
Date: 2007-12-17 21:05:05 -0800 (Mon, 17 Dec 2007)
New Revision: 8898
Removed:
cs/portal/trunk/magportal/daemon.py
Log:
This is the wrong one.
Deleted: cs/portal/trunk/magportal/daemon.py
===================================================================
--- cs/portal/trunk/magportal/daemon.py 2007-12-18 02:14:43 UTC (rev 8897)
+++ cs/portal/trunk/magportal/daemon.py 2007-12-18 05:05:05 UTC (rev 8898)
@@ -1,548 +0,0 @@
-
-import stackless
-import os, sys, signal
-from popen2 import Popen4
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.units.time import second
-
-
-# NYI: RSL AST
-class RSLUnquoted(object):
- def __init__(self, value):
- self.value = value
-
-TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
-TG_COMMUNITY = "/projects/tg"
-
-#EXECUTABLE = TG_COMMUNITY + "/CIG/hello/hello"
-EXECUTABLE = "/home/teragrid/tg459131/bin/hello"
-
-# I'm not sure what to do about this yet.
-TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
-
-
-
-class GassServer(object):
-
- # @classmethod
- def startUp(cls, directory):
- argv = ["globus-gass-server", "-r", "-w", "-c"]
- savedWd = os.getcwd()
- os.chdir(directory)
- child = Popen4(argv)
- os.chdir(savedWd)
- child.tochild.close()
- url = child.fromchild.readline().strip()
- return GassServer(child, url, directory)
- startUp = classmethod(startUp)
-
- def __init__(self, child, url, directory):
- self.child = child
- self.url = url
- self.directory = directory
-
- def shutDown(self):
- argv = ["globus-gass-server-shutdown", self.url]
- os.spawnvp(os.P_NOWAIT, argv[0], argv)
- status = self.child.wait()
- return
-
-
-class GlobusJobManager(object):
-
- def __init__(self, clock, root, info):
- self.clock = clock
- self.root = root
- self.info = info
-
-
- def runJob(self, job):
- self.createSubdirectoryForJob(job)
- stackless.tasklet(self.jobRunner)(job)
-
-
- def spawn(self, *argv):
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
- status = os.spawnvp(os.P_WAIT, argv[0], argv)
- statusMsg = "%s: exit %d" % (argv[0], status)
- self.info.log(statusMsg)
- if status != 0:
- raise Exception(statusMsg)
- return
-
-
- def ospawn(self, *argv):
- command = ' '.join(argv)
- self.info.log("spawning: %s" % command)
-
- child = Popen4(argv)
-
- child.tochild.close()
-
- output = child.fromchild.readlines()
- status = child.wait()
-
- exitStatus = None
- if (os.WIFSIGNALED(status)):
- statusStr = "signal %d" % os.WTERMSIG(status)
- elif (os.WIFEXITED(status)):
- exitStatus = os.WEXITSTATUS(status)
- statusStr = "exit %d" % exitStatus
- else:
- statusStr = "status %d" % status
- statusMsg = "%s: %s" % (argv[0], statusStr)
-
- for line in output:
- self.info.line(" " + line.rstrip())
- self.info.log(statusMsg)
-
- if exitStatus != 0:
- raise Exception(statusMsg)
-
- return output
-
-
- def download(self, url, pathname):
- import shutil
- import urllib2
- self.info.log("downloading %s" % url)
- infile = urllib2.urlopen(url)
- outfile = open(pathname, 'wb')
- shutil.copyfileobj(infile, outfile)
- outfile.close()
- infile.close()
- return
-
- def downloadInputFilesForJob(self, job):
- for inputFile in job.inputFiles:
- url = job.urlForInputFile(inputFile)
- filename = inputFile.split('/')[-1] # strips 'mineos/'
- pathname = os.path.join(job.directory, filename)
- self.download(url, pathname)
- return
-
- def createSubdirectoryForJob(self, job):
- dirName = "job%05d" % job.id
- dirPath = os.path.join(self.root, dirName)
- if not os.path.exists(dirPath):
- os.mkdir(dirPath)
- job.subdir = dirName
- job.directory = dirPath
- return
-
-
- def jobRunner(self, job):
-
- self.downloadInputFilesForJob(job)
- gassServer = GassServer.startUp(job.directory)
-
- try:
- resSpec = self.resSpec(job, gassServer)
- id = self.globusrun(resSpec)
-
- oldStatus = job.status
- ticks = 2
- while job.isAlive():
- for t in xrange(ticks):
- self.clock.tick.wait()
- ticks *= 2
- status = self.getJobStatus(id)
- if status != oldStatus:
- job.setStatus(status)
- oldStatus = status
- ticks = 2
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- job.setStatus(job.STATUS_FAILED)
-
- finally:
- gassServer.shutDown()
-
- return
-
-
- def globusrun(self, resSpec):
- import tempfile
- fd, rslName = tempfile.mkstemp(suffix=".rsl")
- stream = os.fdopen(fd, "w")
- try:
- self.writeRsl(resSpec, stream)
- stream.close()
- if resSpec['jobType'] == "mpi":
- resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
- else:
- resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
- output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
- id = None
- for line in output:
- if line.startswith("https://"):
- id = line.strip()
- assert id
- finally:
- stream.close()
- os.unlink(rslName)
- return id
-
-
- def writeRsl(self, resSpec, stream):
- print >>stream, '&'
- for relation in resSpec.iteritems():
- attribute, valueSequence = relation
- valueSequence = self.rslValueSequenceToString(valueSequence)
- print >>stream, '(', attribute, '=', valueSequence, ')'
- print >>stream, TACC_ENVIRONMENT
- return
-
-
- def rslValueSequenceToString(self, valueSequence):
- if not isinstance(valueSequence, (tuple, list)):
- valueSequence = [valueSequence]
- s = []
- for value in valueSequence:
- if isinstance(value, RSLUnquoted):
- s.append(value.value)
- elif isinstance(value, (tuple, list)):
- s.append('(' + self.rslValueSequenceToString(value) + ')')
- else:
- s.append('"%s"' % value)
- return ' '.join(s)
-
-
- def getJobStatus(self, id):
- output = self.ospawn("globus-job-status", id)
- status = output[0].strip()
- return status
-
-
- def resSpec(self, job, gassServer):
- resSpec = {}
- resSpec.update(job.resSpec)
- resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
- resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
-
- file_stage_in = []
- for inputFile in job.inputFiles:
- url = gassServer.url + "/./" + inputFile
- file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
- if file_stage_in:
- resSpec["file_stage_in"] = file_stage_in
-
- file_stage_out = []
- for outputFile in job.outputFiles:
- url = gassServer.url + "/./" + outputFile
- file_stage_out.append([outputFile, url])
- if file_stage_out:
- resSpec["file_stage_out"] = file_stage_out
-
- resSpec["stdout"] = gassServer.url + "/./stdout.txt"
- resSpec["stderr"] = gassServer.url + "/./stderr.txt"
-
- job.outputFiles.extend(["stdout.txt", "stderr.txt"])
-
- return resSpec
-
-
-class Event(object):
-
- def __init__(self):
- self.channel = stackless.channel()
-
- def wait(self):
- self.channel.receive()
- return
-
- def signal(self):
-
- # Swap-in a new channel, so that a waiting tasklet that
- # repeatedly calls wait() will block waiting for the *next*
- # signal on its subsequent call to wait().
- channel = self.channel
- self.channel = stackless.channel()
-
- # Unblock all waiters.
- while channel.balance < 0:
- channel.send(None)
-
- return
-
-
-class Job(object):
-
- # status codes
- STATUS_NEW = "NEW" # pseudo
- STATUS_PENDING = "PENDING"
- STATUS_ACTIVE = "ACTIVE"
- STATUS_DONE = "DONE"
- STATUS_FAILED = "FAILED"
- statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
- deadCodes = [STATUS_DONE, STATUS_FAILED]
-
- def __init__(self, id, task, **resSpec):
- self.id = id
- self.task = task
- self.resSpec = resSpec
- self.status = self.STATUS_NEW
- self.statusChanged = Event()
- self.inputFiles = []
- self.urlForInputFile = None
- self.outputFiles = []
- self.subdir = None
- self.directory = None
-
- def setStatus(self, status):
- if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
- assert status in self.statusCodes, "unknown status: %s" % status
- self.status = status
- self.statusChanged.signal()
-
- def isAlive(self):
- return not self.status in self.deadCodes
-
-
-class Simulation(object):
-
- # status codes
- STATUS_NEW = "new"
- STATUS_CONNECTING = "connecting"
- STATUS_PENDING = "pending"
- STATUS_RUNNING = "running"
- STATUS_DONE = "done"
- STATUS_ERROR = "error"
- deadCodes = [STATUS_DONE, STATUS_ERROR]
-
- def __init__(self, id, nodes, urlForInputFile, info):
- self.id = id
- self.nodes = nodes
- self.urlForInputFile = urlForInputFile
- self.info = info
-
- self.status = self.STATUS_NEW
- self.statusChanged = Event()
-
- def go(self, jm):
- stackless.tasklet(self)(jm)
-
- def __call__(self, jm):
- try:
- self.setStatus(self.STATUS_CONNECTING)
-
- # run
- job = self.newJob()
- jm.runJob(job)
-
- while job.isAlive():
- if job.status == job.STATUS_PENDING:
- self.setStatus(self.STATUS_PENDING)
- elif job.status == job.STATUS_ACTIVE:
- self.setStatus(self.STATUS_RUNNING)
- job.statusChanged.wait()
- # while
-
- self.checkOutputFiles(job)
-
- if job.status == job.STATUS_DONE:
- self.setStatus(self.STATUS_DONE)
- else:
- raise RuntimeError("run failed")
- self.setStatus(self.STATUS_ERROR)
-
- except Exception, e:
- self.info.log("error: %s: %s" % (e.__class__.__name__, e))
- self.setStatus(self.STATUS_ERROR)
-
- return
-
- def setStatus(self, status):
- self.status = status
- self.statusChanged.signal()
-
- def isAlive(self):
- return not self.status in self.deadCodes
-
- def newJob(self):
- job = Job(
- self.id,
- "run",
- jobType = "single",
- count = 1,
- executable = EXECUTABLE,
- arguments = ["world"]
- )
- job.urlForInputFile = self.urlForInputFile
- return job
-
- def checkOutputFiles(self, job):
- for outputFile in job.outputFiles:
- pathname = os.path.join(job.directory, outputFile)
- if not os.path.exists(pathname):
- continue
-
- # in case the daemon and the web server are run as
- # different users
- import stat
- os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
-
- return
-
-
-class PortalConnection(object):
-
- MULTIPART_BOUNDARY = '----------eArThQuAkE$'
-
- def __init__(self, portal, clock, info):
- self.portal = portal
- self.clock = clock
- self.info = info
-
- def runSimulations(self, jm):
- stackless.tasklet(self.simulationFactory)(jm)
-
- def simulationFactory(self, jm):
- import urllib2
-
- simulations = {}
-
- while True:
- self.clock.tick.wait()
-
- #self.info.log("GET %s" % self.portal.simulationsUrl)
- try:
- infile = urllib2.urlopen(self.portal.simulationsUrl)
- except Exception, e:
- # Could be transient failure -- e.g., "connection reset by peer".
- self.info.log("error: %s" % e)
- continue
- #self.info.log("OK")
- simulationList = infile.read()
- infile.close()
-
- simulationList = eval(simulationList)
-
- for simulation in simulationList:
- id = int(simulation['id'])
- status = simulation['status']
- nodes = int(simulation['nodes'])
- if (status in [Simulation.STATUS_NEW, ""] and
- not simulations.has_key(id)):
- self.info.log("new simulation %d" % id)
-
- def urlForInputFile(inputFile):
- # Map input filenames to URLs in the context
- # of this simulation.
- return self.inputFileURL(simulation, inputFile)
-
- newSimulation = Simulation(id, nodes, urlForInputFile, self.info)
-
- self.watchSimulation(newSimulation)
- simulations[id] = newSimulation
- newSimulation.go(jm)
-
- return
-
- def watchSimulation(self, simulation):
- stackless.tasklet(self.simulationWatcher)(simulation)
-
- def simulationWatcher(self, simulation):
- url = self.portal.simulationStatusUrl % simulation.id
- while simulation.isAlive():
- simulation.statusChanged.wait()
- fields = {'status': simulation.status}
- self.postStatusChange(url, fields)
- return
-
- def inputFileURL(self, simulation, inputFile):
- return self.portal.inputFileUrl % (simulation.id, inputFile)
-
- def postStatusChange(self, url, fields):
- import urllib
- body = urllib.urlencode(fields)
- headers = {"Content-Type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- return self.post(body, headers, url)
-
- def post(self, body, headers, url):
- self.info.log("POST %s" % url)
- import httplib
- if self.portal.scheme == "http":
- conn = httplib.HTTPConnection(self.portal.host)
- elif self.portal.scheme == "https":
- conn = httplib.HTTPSConnection(self.portal.host)
- else:
- assert False # not scheme in ["http", "https"]
- conn.request("POST", url, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
- self.info.log("response %s" % data)
- return data
-
-
-class Clock(object):
-
- def __init__(self, interval):
- self.interval = interval
- self.tick = Event()
- stackless.tasklet(self)()
-
- def __call__(self):
- from time import sleep
- while True:
- sleep(self.interval)
- self.tick.signal()
- stackless.schedule()
- return
-
-
-class WebPortal(Component):
-
- name = "web-portal"
-
- import pyre.inventory as pyre
- scheme = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
- host = pyre.str("host", default="localhost:8000")
- urlRoot = pyre.str("url-root", default="/hello/")
-
- def _configure(self):
- self.urlPrefix = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
- self.inputFileUrl = self.urlPrefix + 'simulations/%d/%s'
- # runs
- self.simulationsUrl = self.urlPrefix + 'simulations/list.py'
- self.simulationStatusUrl = self.urlRoot + 'simulations/%d/status/'
-
-
-class Daemon(Script):
-
- name = "web-portal-daemon"
-
- import pyre.inventory as pyre
- portal = pyre.facility("portal", factory=WebPortal)
- sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
-
- outputRootPathname = pyre.str("output-root-pathname")
-
-
- def main(self, *args, **kwds):
- self._info.activate()
- self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
- clock = Clock(self.sleepInterval / second)
- jm = GlobusJobManager(clock, self.outputRootPathname, self._info)
- connection = PortalConnection(self.portal, clock, self._info)
- connection.runSimulations(jm)
- try:
- stackless.run()
- except KeyboardInterrupt:
- self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
- return
-
-
-def main(*args, **kwds):
- daemon = Daemon()
- daemon.run(*args, **kwds)
-
-
-if __name__ == "__main__":
- main()
More information about the cig-commits
mailing list