[cig-commits] r9022 - in cs/portal/trunk/magportal: . MagWebPortal MagWebPortal/templates/MagWebPortal

wei at geodynamics.org wei at geodynamics.org
Mon Jan 14 15:59:16 PST 2008


Author: wei
Date: 2008-01-14 15:59:16 -0800 (Mon, 14 Jan 2008)
New Revision: 9022

Removed:
   cs/portal/trunk/magportal/daemon.py
Modified:
   cs/portal/trunk/magportal/MagWebPortal/notifications.py
   cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/inputdatafile_form.html
   cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/parameters.txt
   cs/portal/trunk/magportal/MagWebPortal/views.py
Log:
Remove the old daemon. Made few changes in the templates.


Modified: cs/portal/trunk/magportal/MagWebPortal/notifications.py
===================================================================
--- cs/portal/trunk/magportal/MagWebPortal/notifications.py	2008-01-14 23:17:42 UTC (rev 9021)
+++ cs/portal/trunk/magportal/MagWebPortal/notifications.py	2008-01-14 23:59:16 UTC (rev 9022)
@@ -12,7 +12,7 @@
                "",
                "To download the output, visit the following web page:",
                "",
-               "http://dynamo.geodynamics.org%s/simulations/%d/" % (URL_ROOT, sim.id),
+               "http://dynamo.geodynamics.org:8000%s/simulations/%d/" % (URL_ROOT, sim.id),
                "",
                "Sincerely,",
                "The Web Portal",

Modified: cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/inputdatafile_form.html
===================================================================
--- cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/inputdatafile_form.html	2008-01-14 23:17:42 UTC (rev 9021)
+++ cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/inputdatafile_form.html	2008-01-14 23:59:16 UTC (rev 9022)
@@ -75,7 +75,7 @@
         <label for="id_treset" class=before>treset</label>
         {{ form.treset }}
         {% if form.treset.errors %}<span class=error>{{ form.treset.errors|join:", " }}</span>{% endif %}
-        {% if help_visible %}<span class=help>If ture(checked), reset time and step counter to zero when starting from a storedd dataset. The default is: False.</span>{% endif %}
+        {% if help_visible %}<span class=help>If ture(checked), reset time and step counter to zero when starting from a stored dataset. The default is: False.</span>{% endif %}
     </div>
     <div>
         <label for="id_runid" class=before>runid</label>

Modified: cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/parameters.txt
===================================================================
--- cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/parameters.txt	2008-01-14 23:17:42 UTC (rev 9021)
+++ cs/portal/trunk/magportal/MagWebPortal/templates/MagWebPortal/parameters.txt	2008-01-14 23:59:16 UTC (rev 9022)
@@ -12,7 +12,7 @@
  ngform={{ object.ngform }},
  ngrad={{ object.ngrad }},
  ngcolat={{ object.ngcolat }},
- nglon={{ object.nlog }},
+ nglon={{ object.nglon }},
  dtmax={{ object.dtmax }},
  courfac={{ object.courfac }},
  alffac={{ object.alffac }},
@@ -25,7 +25,7 @@
  ifbfrz=.{{ object.ifbfrz }}.,
  imagcon={{ object.imagcon }},
  ktops={{ object.ktops }},
- kbots={{ object.tbots }},
+ kbots={{ object.kbots }},
  epsc0={{ object.epsc0 }},
  ktopv={{ object.ktopv }},
  kbotv={{ object.kbotv }},

Modified: cs/portal/trunk/magportal/MagWebPortal/views.py
===================================================================
--- cs/portal/trunk/magportal/MagWebPortal/views.py	2008-01-14 23:17:42 UTC (rev 9021)
+++ cs/portal/trunk/magportal/MagWebPortal/views.py	2008-01-14 23:59:16 UTC (rev 9022)
@@ -403,7 +403,7 @@
                               {'form': form, 'help_visible': help_visible},
                               RequestContext(request, {}))
 
-# not sure what to do here
+
 def input_parameters_txt(request, object_id):
     # This illustrates how to generate an input file from a template.
     
@@ -444,7 +444,7 @@
 
     for i in xrange(0, simulation.steps):
         response.write("%s %r %8.4f\n" %
-                       (simulation.friend, simulation.happy, simulation.speed))
+                       (simulation.truncorder, simulation.logisym, simulation.speed))
 
     return response
 

Deleted: cs/portal/trunk/magportal/daemon.py
===================================================================
--- cs/portal/trunk/magportal/daemon.py	2008-01-14 23:17:42 UTC (rev 9021)
+++ cs/portal/trunk/magportal/daemon.py	2008-01-14 23:59:16 UTC (rev 9022)
@@ -1,723 +0,0 @@
-
-import stackless
-import os, sys, signal
-from popen2 import Popen4
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.units.time import second
-
-
-# NYI: RSL AST
-class RSLUnquoted(object):
-    def __init__(self, value):
-        self.value = value
-
-TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
-TG_COMMUNITY = "/projects/tg"
-MAG_DIR = TG_COMMUNITY + "/CIG/MAG/"
-
-# I'm not sure what to do about this yet.
-TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
-
-
-
-class GassServer(object):
-
-    # @classmethod
-    def startUp(cls, directory):
-        argv = ["globus-gass-server", "-r", "-w", "-c"]
-        savedWd = os.getcwd()
-        os.chdir(directory)
-        child = Popen4(argv)
-        os.chdir(savedWd)
-        child.tochild.close()
-        url = child.fromchild.readline().strip()
-        return GassServer(child, url, directory)
-    startUp = classmethod(startUp)
-
-    def __init__(self, child, url, directory):
-        self.child = child
-        self.url = url
-        self.directory = directory
-
-    def shutDown(self):
-        argv = ["globus-gass-server-shutdown", self.url]
-        os.spawnvp(os.P_NOWAIT, argv[0], argv)
-        status = self.child.wait()
-        return
-
-
-class JobManager(object):
-
-    def __init__(self, clock, root, info):
-        self.clock = clock
-        self.root = root
-        self.info = info
-
-
-    def runJob(self, job):
-        self.createSubdirectoryForJob(job)
-        stackless.tasklet(self.jobRunner)(job)
-
-    
-    def spawn(self, *argv):
-        command = ' '.join(argv)
-        self.info.log("spawning: %s" % command)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        statusMsg = "%s: exit %d" % (argv[0], status)
-        self.info.log(statusMsg)
-        if status != 0:
-            raise Exception(statusMsg)
-        return
-
-
-    def ospawn(self, *argv):
-        command = ' '.join(argv)
-        self.info.log("spawning: %s" % command)
-
-        child = Popen4(argv)
-
-        child.tochild.close()
-
-        output = child.fromchild.readlines()
-        status = child.wait()
-
-        exitStatus = None
-        if (os.WIFSIGNALED(status)):
-            statusStr = "signal %d" % os.WTERMSIG(status)
-        elif (os.WIFEXITED(status)):
-            exitStatus = os.WEXITSTATUS(status)
-            statusStr = "exit %d" % exitStatus
-        else:
-            statusStr = "status %d" % status
-        statusMsg = "%s: %s" % (argv[0], statusStr)
-    
-        for line in output:
-            self.info.line("    " + line.rstrip())
-        self.info.log(statusMsg)
-    
-        if exitStatus != 0:
-            raise Exception(statusMsg)
-
-        return output
-
-
-    def download(self, url, pathname):
-        import shutil
-        import urllib2
-        self.info.log("downloading %s" % url)
-        infile = urllib2.urlopen(url)
-        outfile = open(pathname, 'wb')
-        shutil.copyfileobj(infile,  outfile)
-        outfile.close()
-        infile.close()
-        return
-
-    def downloadInputFilesForJob(self, job):
-        for inputFile in job.inputFiles:
-            url = job.urlForInputFile(inputFile)
-            filename = inputFile.split('/')[-1] # strips 'mineos/'
-            pathname = os.path.join(job.directory, filename)
-            self.download(url, pathname)
-        return
-
-    def createSubdirectoryForJob(self, job):
-        while True:
-            dirName = randomName()
-            dirPath = os.path.join(self.root, dirName)
-            if not os.path.exists(dirPath):
-                os.mkdir(dirPath)
-                break
-        job.subdir = dirName
-        job.directory = dirPath
-        return
-
-
-class ForkJobManager(JobManager):
-
-    def jobRunner(self, job):
-
-        self.downloadInputFilesForJob(job)
-
-        try:
-            argv = [job.resSpec['executable']] + job.resSpec['arguments']
-            command = ' '.join(argv)
-            self.info.log("spawning: %s" % command)
-            savedWd = os.getcwd()
-            os.chdir(job.directory)
-            pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
-            os.chdir(savedWd)
-            self.info.log("spawned process %d" % pid)
-            
-            ticks = 2
-            wpid, status = os.waitpid(pid, os.WNOHANG)
-            while wpid == 0:
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks *= 2
-                wpid, status = os.waitpid(pid, os.WNOHANG)
-
-            if (os.WIFSIGNALED(status)):
-                statusStr = "signal %d" % os.WTERMSIG(status)
-                job.setStatus(job.STATUS_FAILED)
-            elif (os.WIFEXITED(status)):
-                exitStatus = os.WEXITSTATUS(status)
-                statusStr = "exit %d" % exitStatus
-                if exitStatus == 0:
-                    job.setStatus(job.STATUS_DONE)
-                else:
-                    job.setStatus(job.STATUS_FAILED)
-            else:
-                statusStr = "status %d" % status
-                job.setStatus(job.STATUS_FAILED)
-            statusMsg = "%s: %s" % (argv[0], statusStr)
-            self.info.log(statusMsg)
-        
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_FAILED)
-
-        return
-
-
-
-class GlobusJobManager(JobManager):
-
-    def jobRunner(self, job):
-
-        self.downloadInputFilesForJob(job)
-        gassServer = GassServer.startUp(job.directory)
-        
-        try:
-            resSpec = self.resSpec(job, gassServer)
-            id = self.globusrun(resSpec)
-
-            oldStatus = job.status
-            ticks = 2
-            while job.isAlive():
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks *= 2
-                status = self.getJobStatus(id)
-                if status != oldStatus:
-                    job.setStatus(status)
-                    oldStatus = status
-                    ticks = 2
-        
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_FAILED)
-
-        finally:
-            gassServer.shutDown()
-        
-        return
-
-
-    def globusrun(self, resSpec):
-        import tempfile
-        fd, rslName = tempfile.mkstemp(suffix=".rsl")
-        stream = os.fdopen(fd, "w")
-        try:
-            self.writeRsl(resSpec, stream)
-            stream.close()
-            if resSpec['jobType'] == "mpi":
-                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
-            else:
-                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
-            output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
-            id = None
-            for line in output:
-                if line.startswith("https://"):
-                    id = line.strip()
-            assert id
-        finally:
-            stream.close()
-            os.unlink(rslName)
-        return id
-
-
-    def writeRsl(self, resSpec, stream):
-        print >>stream, '&'
-        for relation in resSpec.iteritems():
-            attribute, valueSequence = relation
-            valueSequence = self.rslValueSequenceToString(valueSequence)
-            print >>stream, '(', attribute, '=', valueSequence, ')'
-        print >>stream, TACC_ENVIRONMENT
-        return
-
-
-    def rslValueSequenceToString(self, valueSequence):
-        if not isinstance(valueSequence, (tuple, list)):
-            valueSequence = [valueSequence]
-        s = []
-        for value in valueSequence:
-            if isinstance(value, RSLUnquoted):
-                s.append(value.value)
-            elif isinstance(value, (tuple, list)):
-                s.append('(' + self.rslValueSequenceToString(value) + ')')
-            else:
-                s.append('"%s"' % value)
-        return ' '.join(s)
-
-
-    def getJobStatus(self, id):
-        output = self.ospawn("globus-job-status", id)
-        status = output[0].strip()
-        return status
-
-
-    def resSpec(self, job, gassServer):
-        resSpec = {}
-        resSpec.update(job.resSpec)
-        resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
-        resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
-        
-        file_stage_in = []
-        for inputFile in job.inputFiles:
-            url = gassServer.url + "/./" + inputFile
-            file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
-        resSpec["file_stage_in"] = file_stage_in
-        
-        file_stage_out = []
-        for outputFile in job.outputFiles:
-            url = gassServer.url + "/./" + outputFile
-            file_stage_out.append([outputFile, url])
-        if file_stage_out:
-            resSpec["file_stage_out"] = file_stage_out
-        
-        resSpec["stdout"] = gassServer.url + "/./stdout.txt"
-        resSpec["stderr"] = gassServer.url + "/./stderr.txt"
-
-        job.outputFiles.extend(["stdout.txt", "stderr.txt"])
-        
-        return resSpec
-
-
-class Event(object):
-    
-    def __init__(self):
-        self.channel = stackless.channel()
-
-    def wait(self):
-        self.channel.receive()
-        return
-
-    def signal(self):
-
-        # Swap-in a new channel, so that a waiting tasklet that
-        # repeatedly calls wait() will block waiting for the *next*
-        # signal on its subsequent call to wait().
-        channel = self.channel
-        self.channel = stackless.channel()
-
-        # Unblock all waiters.
-        while channel.balance < 0:
-            channel.send(None)
-        
-        return
-
-
-class Job(object):
-
-    # status codes
-    STATUS_NEW      = "NEW" # pseudo
-    STATUS_PENDING  = "PENDING"
-    STATUS_ACTIVE   = "ACTIVE"
-    STATUS_DONE     = "DONE"
-    STATUS_FAILED   = "FAILED"
-    statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
-    deadCodes = [STATUS_DONE, STATUS_FAILED]
-
-    def __init__(self, task, **resSpec):
-        self.task = task
-        self.resSpec = resSpec
-        self.status = self.STATUS_NEW
-        self.statusChanged = Event()
-        self.inputFiles = []
-        self.urlForInputFile = None
-        self.outputFiles = []
-        self.subdir = None
-        self.directory = None
-
-    def setStatus(self, status):
-        if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
-            assert status in self.statusCodes, "unknown status: %s" % status
-        self.status = status
-        self.statusChanged.signal()
-
-    def isAlive(self):
-        return not self.status in self.deadCodes
-
-
-class Run(object):
-    
-    # status codes
-    STATUS_NEW        = "new"
-    STATUS_CONNECTING = "connecting"
-    STATUS_PREPARING  = "preparing"  # reported by build & schedule process
-    STATUS_PENDING    = "pending"    # reported by build & schedule process
-    STATUS_FINISHING  = "finishing"  # reported by launcher process
-    STATUS_DONE       = "done"
-    STATUS_ERROR      = "error"
-    deadCodes = [STATUS_DONE, STATUS_ERROR]
-
-    def __init__(self, id, simulation, urlForInputFile, config, info):
-        self.id = id
-        self.simulation = simulation
-        self.urlForInputFile = urlForInputFile
-        self.dry = config.dry
-        self.info = info
-
-        self.jobChannel = stackless.channel()
-        self.status = self.STATUS_NEW
-        self.statusChanged = Event()
-
-    def go(self, gjm):
-        stackless.tasklet(self)(gjm)
-
-    def __call__(self, gjm):
-        try:
-            self.setStatus(self.STATUS_CONNECTING)
-            
-            # run
-            mag = self.newMagJob()
-            gjm.runJob(mag)
-
-            # send jobs to jobSink()
-            self.jobChannel.send(mag)
-            self.jobChannel.send(None) # no more jobs
-
-            self.setStatus(self.STATUS_PREPARING)
-
-            while mag.isAlive():
-                mag.statusChanged.wait()
-            # while
-
-            if mag.status == mag.STATUS_FAILED:
-                self.setStatus(self.STATUS_ERROR)
-                raise RuntimeError("run failed")
-
-            self.setStatus(self.STATUS_DONE)
-
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            self.setStatus(self.STATUS_ERROR)
-        
-        return
-
-    def setStatus(self, status):
-        self.status = status
-        self.statusChanged.signal()
-
-    def isAlive(self):
-        return not self.status in self.deadCodes
-
-    def newMagJob(self):
-
-        import urllib2
-        argsUrl = self.urlForInputFile("args.py")
-        self.info.log("GET %s" % argsUrl)
-        infile = urllib2.urlopen(argsUrl)
-        self.info.log("OK")
-        args = infile.read()
-        infile.close()
-        args = eval(args)
-
-        job = Job(
-            "run",
-            jobType = "single",
-            count = 1,
-            executable = MAG_DIR + "release/runMag-arg.sh",
-            arguments = args + [self.simulation.parfile]
-            )
-        job.urlForInputFile = self.urlForInputFile
-        sim = self.simulation
-        job.inputFiles = [sim.parfile]
-        job.outputFiles = ["mag.tar.gz"]
-        return job
-
-class Simulation(object):
-    def __init__(self, id):
-        self.id = id
-        self.parfile = 'ParFile.txt'
-
-class PortalConnection(object):
-
-    MULTIPART_BOUNDARY = '----------eArThQuAkE$'
-
-    def __init__(self, portal, outputRootUrl, clock, info):
-        self.portal = portal
-        self.outputRootUrl = outputRootUrl
-        self.clock = clock
-        self.info = info
-
-    def runSimulations(self, gjm, config):
-        stackless.tasklet(self.runFactory)(gjm, config)
-
-    def runFactory(self, gjm, config):
-        import urllib2
-        
-        runs = {}
-        
-        while True:
-            self.clock.tick.wait()
-            
-            self.info.log("GET %s" % self.portal.runsUrl)
-            try:
-                infile = urllib2.urlopen(self.portal.runsUrl)
-            except Exception, e:
-                # Could be transient failure -- e.g., "connection reset by peer".
-                self.info.log("error: %s" % e)
-                continue
-            #self.info.log("OK")
-            runList = infile.read()
-            infile.close()
-            
-            runList = eval(runList)
-            
-            for run in runList:
-                id = int(run['id'])
-                status = run['status']
-                simId = run['simulation']
-                if (status in [Run.STATUS_NEW, ""] and
-                    not runs.has_key(id)):
-                    self.info.log("new run %d" % id)
-                    simulation = Simulation(simId)
-
-                    def urlForInputFile(inputFile):
-                        # Map input filenames to URLs in the context
-                        # of this run.
-                        return self.inputFileURL(simulation, inputFile)
-                    
-                    newRun = Run(id, simulation, urlForInputFile, config, self.info)
-                    
-                    self.watchRun(newRun)
-                    runs[id] = newRun
-                    newRun.go(gjm)
-        
-        return
-
-    def watchRun(self, run):
-        stackless.tasklet(self.runWatcher)(run)
-        stackless.tasklet(self.jobSink)(run)
-        
-    def runWatcher(self, run):
-        url = self.portal.runStatusUrl % run.id
-        while run.isAlive():
-            run.statusChanged.wait()
-            fields = {'status': run.status}
-            self.postStatusChange(url, fields)
-        return
-
-    def jobSink(self, run):
-        newJob = run.jobChannel.receive()
-        while newJob is not None:
-            self.watchJob(newJob, run)
-            newJob = run.jobChannel.receive()
-        return
-
-    def watchJob(self, job, run):
-        url = self.portal.jobCreateUrl
-        fields = self.jobFields(job, run)
-        response = self.postStatusChange(url, fields)
-        portalId = eval(response)
-        stackless.tasklet(self.jobWatcher)(job, portalId, run)
-        
-    def jobWatcher(self, job, portalId, run):
-        url = self.portal.jobUpdateUrl % portalId
-        while job.isAlive():
-            job.statusChanged.wait()
-            fields = self.jobFields(job, run)
-            self.postStatusChange(url, fields)
-        self.postJobOutput(job, portalId)
-        return
-
-    def postJobOutput(self, job, portalId):
-        url = self.portal.outputCreateUrl
-        
-        for outputFile in job.outputFiles:
-            
-            pathname = os.path.join(job.directory, outputFile)
-            if not os.path.exists(pathname):
-                continue
-            
-            # in case the daemon and the web server are run as
-            # different users
-            import stat
-            os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
-            
-            fields = {
-                'job': portalId,
-                'name': outputFile,
-                }
-            self.postStatusChange(url, fields)
-        
-        return
-
-    def jobFields(self, job, run):
-        fields = {
-            'run': run.id,
-            'task': job.task,
-            'status': job.status.lower(),
-            'url': self.outputRootUrl + job.subdir + "/",
-            }
-        return fields
-
-    def inputFileURL(self, simulation, inputFile):
-        return self.portal.inputFileUrl % (simulation.id, inputFile)
-
-    def postStatusChange(self, url, fields):
-        import urllib
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        return self.post(body, headers, url)
-
-    def postStatusChangeAndUploadOutput(self, url, fields, output):
-        # Based upon a recipe by Wade Leftwich.
-        files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
-        body = self.encodeMultipartFormData(fields, files)
-        headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
-                   "Content-Length": str(len(body)),
-                   "Accept": "text/plain"}
-        return self.post(body, headers, url)
-
-    def encodeMultipartFormData(self, fields, files):
-        import shutil
-        from StringIO import StringIO
-        stream = StringIO()
-        def line(s=''): stream.write(s + '\r\n')
-        for key, value in fields.iteritems():
-            line('--' + self.MULTIPART_BOUNDARY)
-            line('Content-Disposition: form-data; name="%s"' % key)
-            line()
-            line(value)
-        for (key, filename, contentType, contentEncoding, content) in files:
-            line('--' + self.MULTIPART_BOUNDARY)
-            line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
-            line('Content-Type: %s' % contentType)
-            line('Content-Encoding: %s' % contentEncoding)
-            line()
-            shutil.copyfileobj(content, stream)
-            line()
-        line('--' + self.MULTIPART_BOUNDARY + '--')
-        line()
-        return stream.getvalue()
-
-    def post(self, body, headers, url):
-        self.info.log("POST %s" % url)
-        import httplib
-        if self.portal.scheme == "http":
-            conn = httplib.HTTPConnection(self.portal.host)
-        elif self.portal.scheme == "https":
-            conn = httplib.HTTPSConnection(self.portal.host)
-        else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", url, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        if len(data) < 10:
-            self.info.log("response %s" % data)
-        else:
-            global counter
-            counter = counter + 1
-            fn = "response-%d.html" % counter
-            f = open(fn, "w")
-            f.write(data)
-            f.close()
-            self.info.log("response is in %s" % fn)
-        return data
-
-
-counter = 0
-
-
-class Clock(object):
-
-    def __init__(self, interval):
-        self.interval = interval
-        self.tick = Event()
-        stackless.tasklet(self)()
-
-    def __call__(self):
-        from time import sleep
-        while True:
-            sleep(self.interval)
-            self.tick.signal()
-            stackless.schedule()
-        return
-
-
-class WebPortal(Component):
-    
-    name = "web-portal"
-
-    import pyre.inventory as pyre
-    scheme   = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
-    host     = pyre.str("host", default="localhost:8000")
-    urlRoot  = pyre.str("url-root", default="/magwebportal/")
-
-    def _configure(self):
-        self.urlPrefix           = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
-        self.inputFileUrl        = self.urlPrefix + 'simulations/%d/%s'
-        # runs
-        self.runsUrl             = self.urlPrefix + 'runs/list.py'
-        self.runStatusUrl        = self.urlRoot + 'runs/%d/status/'
-        # jobs
-        self.jobCreateUrl        = self.urlRoot + 'jobs/create/'
-        self.jobUpdateUrl        = self.urlRoot + 'jobs/%d/update/'
-        # output
-        self.outputCreateUrl     = self.urlRoot + 'output/create/'
-
-
-class Daemon(Script):
-
-    name = "web-portal-daemon"
-
-    import pyre.inventory as pyre
-    portal = pyre.facility("portal", factory=WebPortal)
-    sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
-
-    outputRootPathname = pyre.str("output-root-pathname")
-    outputRootUrl = pyre.str("output-root-url")
-
-    dry = pyre.bool("dry", default=False)
-
-
-    def main(self, *args, **kwds):
-        self._info.activate()
-        self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
-        clock = Clock(self.sleepInterval / second)
-        gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
-        connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
-        connection.runSimulations(gjm, self)
-        try:
-            stackless.run()
-        except KeyboardInterrupt:
-            self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
-        return
-
-
-def randomName():
-    # Stolen from pyre.services.Pickler; perhaps this should be an
-    # official/"exported" Pyre function?
-    
-    alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
-
-    import random
-    random.shuffle(alphabet)
-    key = "".join(alphabet)[0:16]
-
-    return key
-        
-
-def main(*args, **kwds):
-    daemon = Daemon()
-    daemon.run(*args, **kwds)
-
-
-if __name__ == "__main__":
-    main()



More information about the cig-commits mailing list