[cig-commits] r12366 - in cs/portal/trunk: . northridge northridge/backend

leif at geodynamics.org leif at geodynamics.org
Tue Jul 1 17:46:56 PDT 2008


Author: leif
Date: 2008-07-01 17:46:55 -0700 (Tue, 01 Jul 2008)
New Revision: 12366

Added:
   cs/portal/trunk/northridge/backend/
   cs/portal/trunk/northridge/backend/daemon.py
   cs/portal/trunk/northridge/backend/mineos.py
   cs/portal/trunk/northridge/backend/specfem3D.py
   cs/portal/trunk/northridge/backend/web-portal-daemon.cfg
Removed:
   cs/portal/trunk/daemon.py
   cs/portal/trunk/mineos.cfg
   cs/portal/trunk/mineos.py
   cs/portal/trunk/web-portal-daemon.cfg
Modified:
   cs/portal/trunk/northridge/MANIFEST.in
Log:
United the files which compose the portal's backend.  Updated the
example .cfg file for the daemon.


Deleted: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/daemon.py	2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,835 +0,0 @@
-
-import stackless
-import os, sys, signal
-from popen2 import Popen4
-
-from pyre.applications import Script
-from pyre.components import Component
-from pyre.units.time import second
-
-
-# NYI: RSL AST
-class RSLUnquoted(object):
-    def __init__(self, value):
-        self.value = value
-
-TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
-TG_COMMUNITY = "/projects/tg"
-
-# I'm not sure what to do about this yet.
-TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
-
-
-
-class GassServer(object):
-
-    # @classmethod
-    def startUp(cls, directory, info):
-        argv = ["globus-gass-server", "-r", "-w", "-c"]
-        savedWd = os.getcwd()
-        os.chdir(directory)
-        child = Popen4(argv)
-        os.chdir(savedWd)
-        child.tochild.close()
-        while True:
-            url = child.fromchild.readline().strip()
-            if url.startswith("https:"):
-                break
-        child.fromchild.close()
-        return GassServer(child, url, directory, info)
-    startUp = classmethod(startUp)
-
-    def __init__(self, child, url, directory, info):
-        self.child = child
-        self.url = url
-        self.directory = directory
-        self.info = info
-
-    def shutDown(self):
-        # This doesn't work reliably...
-        argv = ["globus-gass-server-shutdown", self.url]
-        command = ' '.join(argv)
-        self.info.log("spawning: %s" % command)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        statusMsg = "%s: exit %d" % (argv[0], status)
-        self.info.log(statusMsg)
-        if status != 0:
-            # ...so sometimes we kill it!
-            self.info.log("kill -INT %d" % self.child.pid)
-            os.kill(self.child.pid, signal.SIGINT)
-        self.info.log("waitpid(%d)" % self.child.pid)
-        status = self.child.wait()
-        return
-
-
-class JobManager(object):
-
-    def __init__(self, clock, root, info):
-        self.clock = clock
-        self.root = root
-        self.info = info
-
-
-    def runJob(self, job):
-        self.createSubdirectoryForJob(job)
-        stackless.tasklet(self.jobRunner)(job)
-
-    
-    def spawn(self, *argv):
-        command = ' '.join(argv)
-        self.info.log("spawning: %s" % command)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        statusMsg = "%s: exit %d" % (argv[0], status)
-        self.info.log(statusMsg)
-        if status != 0:
-            raise Exception(statusMsg)
-        return
-
-
-    def ospawn(self, *argv):
-        command = ' '.join(argv)
-        self.info.log("spawning: %s" % command)
-
-        child = Popen4(argv)
-
-        child.tochild.close()
-
-        output = child.fromchild.readlines()
-        status = child.wait()
-
-        exitStatus = None
-        if (os.WIFSIGNALED(status)):
-            statusStr = "signal %d" % os.WTERMSIG(status)
-        elif (os.WIFEXITED(status)):
-            exitStatus = os.WEXITSTATUS(status)
-            statusStr = "exit %d" % exitStatus
-        else:
-            statusStr = "status %d" % status
-        statusMsg = "%s: %s" % (argv[0], statusStr)
-    
-        for line in output:
-            self.info.line("    " + line.rstrip())
-        self.info.log(statusMsg)
-    
-        if exitStatus != 0:
-            raise Exception(statusMsg)
-
-        return output
-
-
-    def download(self, url, pathname):
-        import shutil
-        import urllib2
-        self.info.log("downloading %s" % url)
-        infile = urllib2.urlopen(url)
-        outfile = open(pathname, 'wb')
-        shutil.copyfileobj(infile,  outfile)
-        outfile.close()
-        infile.close()
-        return
-
-    def downloadInputFilesForJob(self, job):
-        for inputFile in job.inputFiles:
-            url = job.urlForInputFile(inputFile)
-            filename = inputFile.split('/')[-1] # strips 'mineos/'
-            pathname = os.path.join(job.directory, filename)
-            self.download(url, pathname)
-        return
-
-    def createSubdirectoryForJob(self, job):
-        while True:
-            dirName = randomName()
-            dirPath = os.path.join(self.root, dirName)
-            if not os.path.exists(dirPath):
-                os.mkdir(dirPath)
-                break
-        job.subdir = dirName
-        job.directory = dirPath
-        return
-
-
-class ForkJobManager(JobManager):
-
-    def jobRunner(self, job):
-
-        try:
-            self.downloadInputFilesForJob(job)
-
-            argv = self.argvForJob(job)
-            command = ' '.join(argv)
-            self.info.log("spawning: %s" % command)
-            savedWd = os.getcwd()
-            os.chdir(job.directory)
-            pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
-            os.chdir(savedWd)
-            self.info.log("spawned process %d" % pid)
-            
-            ticks = 2
-            wpid, status = os.waitpid(pid, os.WNOHANG)
-            while wpid == 0:
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks = min(ticks * 2, 10)
-                wpid, status = os.waitpid(pid, os.WNOHANG)
-
-            self.uploadOutputFilesForJob(job)
-            
-            if (os.WIFSIGNALED(status)):
-                statusStr = "signal %d" % os.WTERMSIG(status)
-                job.setStatus(job.STATUS_FAILED)
-            elif (os.WIFEXITED(status)):
-                exitStatus = os.WEXITSTATUS(status)
-                statusStr = "exit %d" % exitStatus
-                if exitStatus == 0:
-                    job.setStatus(job.STATUS_DONE)
-                else:
-                    job.setStatus(job.STATUS_FAILED)
-            else:
-                statusStr = "status %d" % status
-                job.setStatus(job.STATUS_FAILED)
-            statusMsg = "%s: %s" % (argv[0], statusStr)
-            self.info.log(statusMsg)
-        
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_FAILED)
-
-        return
-
-
-    def argvForJob(self, job):
-        return ([job.resSpec['executable']] + job.resSpec['arguments'] +
-                ["--progress-url=%s" % job.progressUrl])
-
-
-    def uploadOutputFilesForJob(self, job):
-        return
-
-
-class RemoteShellJobManager(ForkJobManager):
-
-    def __init__(self, clock, root, info, config):
-        ForkJobManager.__init__(self, clock, root, info)
-        self.rsh = config.remoteShellCommand.split()
-        self.rdown = config.remoteDownloadCommand
-        self.rup = config.remoteUploadCommand
-        self.remoteOutputRoot = config.remoteOutputRoot
-
-
-    def downloadInputFilesForJob(self, job):
-        # First, download to the local host.
-        ForkJobManager.downloadInputFilesForJob(self, job)
-
-        # Create a remote directory for this run.
-        remoteDirectory = self.remoteDirectoryForJob(job)
-        argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
-        self.spawn(*argv)
-        
-        # Now copy to the remote host.
-        for inputFile in job.inputFiles:
-            filename = inputFile.split('/')[-1] # strips 'mineos/'
-            pathname = os.path.join(job.directory, filename)
-            remotePathname = os.path.join(remoteDirectory, filename)
-            argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
-            self.spawn(*argv)
-            
-        return
-
-
-    def uploadOutputFilesForJob(self, job):
-        remoteDirectory = self.remoteDirectoryForJob(job)
-        for filename in job.outputFiles:
-            pathname = os.path.join(job.directory, filename)
-            remotePathname = os.path.join(remoteDirectory, filename)
-            argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
-            try:
-                self.spawn(*argv)
-            except Exception:
-                pass
-        return
-
-
-    def argvForJob(self, job):
-        remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
-                         ' '.join(ForkJobManager.argvForJob(self, job)))
-        return self.rsh + [remoteCommand]
-
-
-    def remoteDirectoryForJob(self, job):
-        return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
-
-
-class GlobusJobManager(JobManager):
-
-    def jobRunner(self, job):
-
-        try:
-            self.downloadInputFilesForJob(job)
-            gassServer = GassServer.startUp(job.directory, self.info)
-        
-            resSpec = self.resSpec(job, gassServer)
-            id = self.globusrun(resSpec)
-
-            oldStatus = job.status
-            ticks = 2
-            while job.isAlive():
-                for t in xrange(ticks):
-                    self.clock.tick.wait()
-                ticks *= 2
-                status = self.getJobStatus(id)
-                if status != oldStatus:
-                    job.setStatus(status)
-                    oldStatus = status
-                    ticks = 2
-        
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            job.setStatus(job.STATUS_FAILED)
-
-        finally:
-            gassServer.shutDown()
-        
-        return
-
-
-    def globusrun(self, resSpec):
-        import tempfile
-        fd, rslName = tempfile.mkstemp(suffix=".rsl")
-        stream = os.fdopen(fd, "w")
-        try:
-            self.writeRsl(resSpec, stream)
-            stream.close()
-            if resSpec['jobType'] == "mpi":
-                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
-            else:
-                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
-            output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
-            id = None
-            for line in output:
-                if line.startswith("https://"):
-                    id = line.strip()
-            assert id
-        finally:
-            stream.close()
-            os.unlink(rslName)
-        return id
-
-
-    def writeRsl(self, resSpec, stream):
-        print >>stream, '&'
-        for relation in resSpec.iteritems():
-            attribute, valueSequence = relation
-            valueSequence = self.rslValueSequenceToString(valueSequence)
-            print >>stream, '(', attribute, '=', valueSequence, ')'
-        print >>stream, TACC_ENVIRONMENT
-        return
-
-
-    def rslValueSequenceToString(self, valueSequence):
-        if not isinstance(valueSequence, (tuple, list)):
-            valueSequence = [valueSequence]
-        s = []
-        for value in valueSequence:
-            if isinstance(value, RSLUnquoted):
-                s.append(value.value)
-            elif isinstance(value, (tuple, list)):
-                s.append('(' + self.rslValueSequenceToString(value) + ')')
-            else:
-                s.append('"%s"' % value)
-        return ' '.join(s)
-
-
-    def getJobStatus(self, id):
-        output = self.ospawn("globus-job-status", id)
-        status = output[0].strip()
-        return status
-
-
-    def resSpec(self, job, gassServer):
-        resSpec = {}
-        resSpec.update(job.resSpec)
-        resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
-        resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
-        
-        file_stage_in = []
-        for inputFile in job.inputFiles:
-            url = gassServer.url + "/./" + inputFile
-            file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
-        resSpec["file_stage_in"] = file_stage_in
-        
-        file_stage_out = []
-        for outputFile in job.outputFiles:
-            url = gassServer.url + "/./" + outputFile
-            file_stage_out.append([outputFile, url])
-        if file_stage_out:
-            resSpec["file_stage_out"] = file_stage_out
-        
-        resSpec["stdout"] = gassServer.url + "/./stdout.txt"
-        resSpec["stderr"] = gassServer.url + "/./stderr.txt"
-
-        job.outputFiles.extend(["stdout.txt", "stderr.txt"])
-        
-        return resSpec
-
-
-class Event(object):
-    
-    def __init__(self):
-        self.channel = stackless.channel()
-
-    def wait(self):
-        self.channel.receive()
-        return
-
-    def signal(self):
-
-        # Swap-in a new channel, so that a waiting tasklet that
-        # repeatedly calls wait() will block waiting for the *next*
-        # signal on its subsequent call to wait().
-        channel = self.channel
-        self.channel = stackless.channel()
-
-        # Unblock all waiters.
-        while channel.balance < 0:
-            channel.send(None)
-        
-        return
-
-
-class Job(object):
-
-    # status codes
-    STATUS_NEW      = "NEW" # pseudo
-    STATUS_PENDING  = "PENDING"
-    STATUS_ACTIVE   = "ACTIVE"
-    STATUS_DONE     = "DONE"
-    STATUS_FAILED   = "FAILED"
-    statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
-    deadCodes = [STATUS_DONE, STATUS_FAILED]
-
-    def __init__(self, run, task, **resSpec):
-        self.run = run
-        self.task = task
-        self.resSpec = resSpec
-        self.status = self.STATUS_NEW
-        self.statusChanged = Event()
-        self.inputFiles = []
-        self.urlForInputFile = None
-        self.outputFiles = []
-        self.subdir = None
-        self.directory = None
-
-    def setStatus(self, status):
-        if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
-            assert status in self.statusCodes, "unknown status: %s" % status
-        self.status = status
-        self.statusChanged.signal()
-
-    def isAlive(self):
-        return not self.status in self.deadCodes
-
-
-class Run(object):
-    
-    # status codes
-    STATUS_NEW        = "ready"
-    STATUS_CONNECTING = "connecting"
-    STATUS_PREPARING  = "preparing"  # reported by build & schedule process
-    STATUS_PENDING    = "pending"    # reported by build & schedule process
-    STATUS_MESHING    = "meshing"    # reported by launcher process
-    STATUS_SOLVING    = "solving"    # reported by launcher process
-    STATUS_FINISHING  = "finishing"  # reported by launcher process
-    STATUS_DONE       = "done"
-    STATUS_ERROR      = "error"
-    deadCodes = [STATUS_DONE, STATUS_ERROR]
-
-    def __init__(self, id, urlForInputFile, config, info, jm):
-        self.id = id
-        self.urlForInputFile = urlForInputFile
-        self.specfemPathname = config.specfemPathname
-        self.mineosPathname = config.mineosPathname
-        self.dry = config.dry
-        self.info = info
-        self.jm = jm
-
-        self.jobChannel = stackless.channel()
-        self.status = self.STATUS_NEW
-        self.statusChanged = Event()
-
-    def go(self):
-        stackless.tasklet(self)()
-
-    def __call__(self):
-        try:
-            self.setStatus(self.STATUS_CONNECTING)
-            
-            # run
-            job = self.newJob()
-            self.jm.runJob(job)
-
-            # send jobs to jobSink()
-            self.jobChannel.send(job)
-            self.jobChannel.send(None) # no more jobs
-
-            self.setStatus(self.STATUS_PREPARING)
-
-            while job.isAlive():
-                job.statusChanged.wait()
-            # while
-
-            if job.status == job.STATUS_FAILED:
-                self.setStatus(self.STATUS_ERROR)
-                raise RuntimeError("run failed")
-
-            self.setStatus(self.STATUS_DONE)
-
-        except Exception, e:
-            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
-            self.setStatus(self.STATUS_ERROR)
-        
-        return
-
-    def setStatus(self, status):
-        self.status = status
-        self.statusChanged.signal()
-
-    def isAlive(self):
-        return not self.status in self.deadCodes
-
-
-class SpecfemRun(Run):
-
-    def newJob(self):
-        dry = []
-        if self.dry:
-            dry = ["--scheduler.dry"]
-        parameters = 'par_file.txt'
-        event = 'event.txt'
-        stations = 'stations.txt'
-        job = Job(
-            self,
-            "run",
-            jobType = "single",
-            count = 1,
-            executable = self.specfemPathname,
-            arguments = ['--par-file=' + parameters,
-                         '--cmt-solution=' + event,
-                         '--stations=' + stations,
-                         "--scheduler.wait=True",
-                         "--job.name=run%05d" % self.id,
-                         "--macros.run.id=%05d" % self.id,
-                         #"--job.walltime=2*hour",
-                         "--job.stdout=stdout.txt",
-                         "--job.stderr=stderr.txt",
-                         ] + dry,
-            )
-        job.urlForInputFile = self.urlForInputFile
-        job.inputFiles = [parameters, event, stations]
-        job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
-        return job
-
-
-class MineosRun(Run):
-
-    def newJob(self):
-        job = Job(
-            self,
-            "run",
-            executable = self.mineosPathname,
-            arguments = ["parameters.pml"],
-            )
-        job.urlForInputFile = self.urlForInputFile
-        job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
-        job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
-        return job
-
-
-class PortalConnection(object):
-
-    MULTIPART_BOUNDARY = '----------eArThQuAkE$'
-
-    def __init__(self, portal, outputRootUrl, clock, info):
-        self.portal = portal
-        self.outputRootUrl = outputRootUrl
-        self.clock = clock
-        self.info = info
-
-    def runSimulations(self, gjm, fjm, config):
-        stackless.tasklet(self.runFactory)(gjm, fjm, config)
-
-    def runFactory(self, gjm, fjm, config):
-        import urllib2
-        
-        runs = {}
-        
-        while True:
-            self.clock.tick.wait()
-            
-            #self.info.log("GET %s" % self.portal.runsUrl)
-            try:
-                infile = urllib2.urlopen(self.portal.runsUrl)
-            except Exception, e:
-                # Could be transient failure -- e.g., "connection reset by peer".
-                self.info.log("error: %s" % e)
-                continue
-            #self.info.log("OK")
-            runList = infile.read()
-            infile.close()
-            
-            runList = eval(runList)
-            
-            for run in runList:
-                id = int(run['id'])
-                status = run['status']
-                code = run['code']
-                if (status == Run.STATUS_NEW and not runs.has_key(id)):
-                    self.info.log("new run %d" % id)
-
-                    urlForInputFile = URLForInputFile(self.portal.inputFileUrl, id)
-
-                    if code == 1:
-                        newRun = SpecfemRun(id, urlForInputFile, config, self.info, gjm)
-                    elif code == 2:
-                        newRun = MineosRun(id, urlForInputFile, config, self.info, fjm)
-                    else:
-                        self.info.log("unknown code %d" % code)
-                        continue
-                    
-                    self.watchRun(newRun)
-                    runs[id] = newRun
-                    newRun.go()
-        
-        return
-
-    def watchRun(self, run):
-        stackless.tasklet(self.runWatcher)(run)
-        stackless.tasklet(self.jobSink)(run)
-        
-    def runWatcher(self, run):
-        url = self.portal.runStatusUrl % run.id
-        while run.isAlive():
-            run.statusChanged.wait()
-            fields = {'status': run.status}
-            self.postStatusChange(url, fields)
-        return
-
-    def jobSink(self, run):
-        newJob = run.jobChannel.receive()
-        while newJob is not None:
-            self.watchJob(newJob, run)
-            newJob = run.jobChannel.receive()
-        return
-
-    def watchJob(self, job, run):
-        url = self.portal.jobCreateUrl
-        fields = self.jobFields(job, run)
-        response = self.postStatusChange(url, fields)
-        portalId = eval(response)
-        job.progressUrl = self.portal.jobProgressUrl % portalId
-        stackless.tasklet(self.jobWatcher)(job, portalId, run)
-        
-    def jobWatcher(self, job, portalId, run):
-        url = self.portal.jobUpdateUrl % portalId
-        while job.isAlive():
-            job.statusChanged.wait()
-            fields = self.jobFields(job, run)
-            self.postStatusChange(url, fields)
-        self.postJobOutput(job, portalId)
-        return
-
-    def postJobOutput(self, job, portalId):
-        url = self.portal.outputCreateUrl
-        
-        for outputFile in job.outputFiles:
-            
-            pathname = os.path.join(job.directory, outputFile)
-            if not os.path.exists(pathname):
-                continue
-            
-            # in case the daemon and the web server are run as
-            # different users
-            import stat
-            os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
-            
-            fields = {
-                'job': portalId,
-                'name': outputFile,
-                }
-            self.postStatusChange(url, fields)
-        
-        return
-
-    def jobFields(self, job, run):
-        fields = {
-            'run': run.id,
-            'task': job.task,
-            'status': job.status.lower(),
-            'url': self.outputRootUrl + job.subdir + "/",
-            }
-        return fields
-
-    def postStatusChange(self, url, fields):
-        import urllib
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        return self.post(body, headers, url)
-
-    def postStatusChangeAndUploadOutput(self, url, fields, output):
-        # Based upon a recipe by Wade Leftwich.
-        files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
-        body = self.encodeMultipartFormData(fields, files)
-        headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
-                   "Content-Length": str(len(body)),
-                   "Accept": "text/plain"}
-        return self.post(body, headers, url)
-
-    def encodeMultipartFormData(self, fields, files):
-        import shutil
-        from StringIO import StringIO
-        stream = StringIO()
-        def line(s=''): stream.write(s + '\r\n')
-        for key, value in fields.iteritems():
-            line('--' + self.MULTIPART_BOUNDARY)
-            line('Content-Disposition: form-data; name="%s"' % key)
-            line()
-            line(value)
-        for (key, filename, contentType, contentEncoding, content) in files:
-            line('--' + self.MULTIPART_BOUNDARY)
-            line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
-            line('Content-Type: %s' % contentType)
-            line('Content-Encoding: %s' % contentEncoding)
-            line()
-            shutil.copyfileobj(content, stream)
-            line()
-        line('--' + self.MULTIPART_BOUNDARY + '--')
-        line()
-        return stream.getvalue()
-
-    def post(self, body, headers, url):
-        self.info.log("POST %s" % url)
-        import httplib
-        if self.portal.scheme == "http":
-            conn = httplib.HTTPConnection(self.portal.host)
-        elif self.portal.scheme == "https":
-            conn = httplib.HTTPSConnection(self.portal.host)
-        else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", url, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        self.info.log("response %s" % data)
-        return data
-
-
-class URLForInputFile(object):
-    def __init__(self, inputFileUrl, id):
-        self.inputFileUrl = inputFileUrl
-        self.id = id
-    def __call__(self, inputFile):
-        # Map input filenames to URLs in the context of this run.
-        return self.inputFileUrl % (self.id, inputFile)
-
-
-
-class Clock(object):
-
-    def __init__(self, interval):
-        self.interval = interval
-        self.tick = Event()
-        stackless.tasklet(self)()
-
-    def __call__(self):
-        from time import sleep
-        while True:
-            sleep(self.interval)
-            self.tick.signal()
-            stackless.schedule()
-        return
-
-
-class WebPortal(Component):
-    
-    name = "web-portal"
-
-    import pyre.inventory as pyre
-    scheme   = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
-    host     = pyre.str("host", default="localhost:8000")
-    urlRoot  = pyre.str("url-root", default="/portals/seismo/")
-
-    def _configure(self):
-        self.urlPrefix           = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
-        self.inputFileUrl        = self.urlPrefix + 'runs/%d/%s'
-        # runs
-        self.runsUrl             = self.urlPrefix + 'runs/list.py'
-        self.runStatusUrl        = self.urlRoot + 'runs/%d/status/'
-        # jobs
-        self.jobCreateUrl        = self.urlRoot + 'jobs/create/'
-        self.jobUpdateUrl        = self.urlRoot + 'jobs/%d/update/'
-        self.jobProgressUrl      = self.urlPrefix + 'jobs/%d/progress/'
-        # output
-        self.outputCreateUrl     = self.urlRoot + 'output/create/'
-
-
-class Daemon(Script):
-
-    name = "web-portal-daemon"
-
-    import pyre.inventory as pyre
-    portal = pyre.facility("portal", factory=WebPortal)
-    sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
-
-    outputRootPathname = pyre.str("output-root-pathname")
-    outputRootUrl = pyre.str("output-root-url")
-
-    specfemPathname = pyre.str("specfem-pathname", default="pyspecfem3D")
-    mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
-
-    remote = pyre.bool("remote", default=False)
-    remoteShellCommand = pyre.str("remote-shell-command")
-    remoteDownloadCommand = pyre.str("remote-download-command")
-    remoteUploadCommand = pyre.str("remote-upload-command")
-    remoteOutputRoot = pyre.str("remote-output-root")
-
-    dry = pyre.bool("dry", default=False)
-
-
-    def main(self, *args, **kwds):
-        self._info.activate()
-        self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
-        clock = Clock(self.sleepInterval / second)
-        if self.remote:
-            gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
-        else:
-            gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
-        fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
-        connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
-        connection.runSimulations(gjm, fjm, self)
-        try:
-            stackless.run()
-        except KeyboardInterrupt:
-            self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
-        return
-
-
-def randomName():
-    # Stolen from pyre.services.Pickler; perhaps this should be an
-    # official/"exported" Pyre function?
-    
-    alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
-
-    import random
-    random.shuffle(alphabet)
-    key = "".join(alphabet)[0:16]
-
-    return key
-        
-
-def main(*args, **kwds):
-    daemon = Daemon()
-    daemon.run(*args, **kwds)
-
-
-if __name__ == "__main__":
-    main()

Deleted: cs/portal/trunk/mineos.cfg
===================================================================
--- cs/portal/trunk/mineos.cfg	2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/mineos.cfg	2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,30 +0,0 @@
-
-[mineos]
-event = china_cmt_event
-#model = prem_noocean.txt
-
-radial = True
-toroidal = True
-spheroidal = True
-inner-core-toroidal = True
-
-
-# minos_bran
-eps = 1.0e-10
-wgrav = 1*milli*hertz
-lmin  = 2
-lmax = 8000
-wmin = 0.0*milli*hertz
-wmax = 200.0*milli*hertz
-nmin = 0
-nmax = 0
-
-# eigcon
-max-depth = 1000*km
-
-# green
-fmin = 10*milli*hertz
-fmax = 260*milli*hertz
-nsamples = 8000
-stations = short
-

Deleted: cs/portal/trunk/mineos.py
===================================================================
--- cs/portal/trunk/mineos.py	2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/mineos.py	2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,404 +0,0 @@
-#!/usr/bin/env python
-
-
-from pyre.applications import Script
-from pyre.units.SI import milli, hertz
-from pyre.units.length import km
-import os, sys, stat, popen2, time
-from os.path import basename, dirname, splitext
-
-mHz = milli*hertz
-
-# Assume this script is installed alongside the mineos binaries.
-prefix = dirname(dirname(__file__))
-
-
-mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
-
-
-class Mode(object):
-    def __init__(self, jcom, id, desc):
-        self.jcom = jcom
-        self.id = id
-        self.desc = desc
-
-
-modes = [Mode(jcom, id, desc) for jcom, id, desc in [
-    (1, 'radial', 'radial'),
-    (2, 'toroidal', 'toroidal'),
-    (3, 'spheroidal', 'spheroidal'),
-    (4, 'ictoroidal', 'inner core toroidal'),
-    ]]
-del jcom, id, desc
-
-
-class Mineos(object):
-
-    def __init__(self, model, event, stations, progressUrl):
-        self.bin = prefix + "/bin"
-        self.model = model
-        self.event = event
-        self.stations = stations
-        self.progressUrl = progressUrl
-        self.eigcon_out = []
-        self.green_out = "green"
-        return
-
-
-    def minos_bran(self,
-                   mode,
-                   eps, wgrav,
-                   lmin, lmax,
-                   wmin, wmax,
-                   nmin, nmax):
-        if mode.id == 'radial':
-            # These are ignored.
-            lmin = 0
-            lmax = 0
-            # Misha Barmin says that that there are no dispersion
-            # branches for radial modes; thus nmin, nmax have a
-            # different sense in this context.  He says to fix them as
-            # follows.
-            nmin = 0
-            nmax = 8000
-        minos_bran_in = mode.id + "_mineos_bran.in"
-        s = open(minos_bran_in, "w")
-        print >>s, self.model
-        print >>s, self.listing(mode)
-        print >>s, self.eigenfunctions(mode)
-        print >>s, eps, wgrav
-        print >>s, mode.jcom
-        print >>s, lmin, lmax, wmin, wmax, nmin, nmax
-        s.close()
-        self.run("minos_bran", stdin=minos_bran_in)
-        return
-
-
-    def eigcon(self, mode, max_depth):
-        eigcon_in = mode.id + "_eigcon.in"
-        s = open(eigcon_in, "w")
-        print >>s, mode.jcom
-        print >>s, self.model
-        print >>s, max_depth
-        print >>s, self.listing(mode)
-        print >>s, self.eigenfunctions(mode)
-        dbname = mode.id #+ "_eigcon_out"
-        print >>s, dbname
-        s.close()
-        self.run("eigcon", stdin=eigcon_in)
-        self.eigcon_out.append(dbname)
-        return
-
-
-    def green(self, fmin, fmax, nsamples):
-        nStations = self.lineCount(self.stations + ".site")
-        s = open("db_list", "w")
-        for dbname in self.eigcon_out:
-            print >>s, dbname
-        green_in = open("green.in", "w")
-        argv = [os.path.join(self.bin, "green")]
-        child = popen2.Popen4(argv)
-        for s in [green_in, child.tochild]:
-            print >>s, self.stations
-            print >>s, "db_list"
-            print >>s, self.event
-            print >>s, fmin, fmax
-            print >>s, nsamples
-            print >>s, self.green_out
-            s.close()
-        stationTally = 0
-        self.postProgress(0.0)
-        lastPostTime = time.time()
-        for line in child.fromchild:
-            print line,
-            if line.startswith(" green: Station: "):
-                stationTally += 1
-                now = time.time()
-                if now - lastPostTime >= 60.0:
-                    self.postProgress(float(stationTally) / float(nStations))
-                    lastPostTime = now
-        status = child.wait()
-        self.checkStatus(status, argv)
-        return
-
-
-    def creat_origin(self):
-        self.run("creat_origin", [self.event, self.green_out])
-
-
-    def syndat(self, plane, datatype):
-        syndat_in = "syndat.in"
-        s = open(syndat_in, "w")
-        print >>s, self.event
-        print >>s, plane # error in documentation!
-        print >>s, self.green_out
-        print >>s, "seismograms" #"syndat_out"
-        print >>s, datatype
-        s.close()
-        self.run("syndat", stdin=syndat_in)
-        return
-
-
-    def cucss2sac(self, *args):
-        self.run("cucss2sac", list(args))
-        return
-
-
-    def listing(self, mode):
-        return mode.id + "_listing"
-
-
-    def eigenfunctions(self, mode):
-        return mode.id + "_eigenfunctions"
-
-
-    def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
-        sys.stdout.flush()
-        sys.stderr.flush()
-        if args is None:
-            args = []
-        argv = [program] + args
-        pid = os.fork()
-        if pid:
-            # parent
-            _, status = os.wait()
-            self.checkStatus(status, argv)
-        else:
-            # child
-            if stdin is not None:
-                fd = os.open(stdin, os.O_RDONLY)
-                os.dup2(fd, 0)
-            if stdout is not None:
-                fd = os.open(stdout, os.O_WRONLY, mode)
-                os.dup2(fd, 1)
-            if stderr is not None:
-                fd = os.open(stderr, os.O_WRONLY, mode)
-                os.dup2(fd, 2)
-            executable = os.path.join(self.bin, argv[0])
-            try:
-                os.execvp(executable, argv)
-            except Exception, e:
-                sys.exit("%s: %s" % (executable, e))
-            
-        return
-
-
-    def checkStatus(self, status, argv):
-        exitStatus = None
-        if (os.WIFSIGNALED(status)):
-            statusStr = "signal %d" % os.WTERMSIG(status)
-        elif (os.WIFEXITED(status)):
-            exitStatus = os.WEXITSTATUS(status)
-            statusStr = "exit %d" % exitStatus
-        else:
-            statusStr = "status %d" % status
-        statusMsg = "%s: %s" % (argv[0], statusStr)
-        if exitStatus != 0:
-            sys.exit(statusMsg)
-        return
-
-
-    def lineCount(self, filename):
-        tally = 0
-        s = open(filename, 'r')
-        for line in s:
-            tally += 1
-        return tally
-
-
-    def postProgress(self, progress):
-        import urllib, urlparse
-        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
-        fields = { 'progress': "%.2f" % progress }
-        body = urllib.urlencode(fields)
-        headers = {"Content-Type": "application/x-www-form-urlencoded",
-                   "Accept": "text/plain"}
-        import httplib
-        if scheme == "http":
-            conn = httplib.HTTPConnection(host)
-        elif scheme == "https":
-            conn = httplib.HTTPSConnection(host)
-        else:
-            assert False # not scheme in ["http", "https"]
-        conn.request("POST", path, body, headers)
-        response = conn.getresponse()
-        data = response.read()
-        conn.close()
-        return data
-
-
-
-class MineosScript(Script):
-
-    name = "mineos"
-
-    import pyre.inventory as pyre
-
-    #------------------------------------------------------------------------
-    # common
-    
-    model = pyre.str("model")
-
-    # jcom
-    radial = pyre.bool("radial")
-    toroidal = pyre.bool("toroidal")
-    spheroidal = pyre.bool("spheroidal")
-    ictoroidal = pyre.bool("inner-core-toroidal")
-
-    event = pyre.str("event")
-
-    
-    #------------------------------------------------------------------------
-    # minos_bran
-    
-    eps = pyre.float("eps")
-    wgrav = pyre.dimensional("wgrav", default=1*hertz)
-
-    lmin = pyre.int("lmin")
-    lmax = pyre.int("lmax")
-    
-    wmin = pyre.dimensional("wmin", default=1*hertz)
-    wmax = pyre.dimensional("wmax", default=1*hertz)
-    
-    nmin = pyre.int("nmin")
-    nmax = pyre.int("nmax")
-
-
-    #------------------------------------------------------------------------
-    # eigcon
-
-    max_depth = pyre.dimensional("max-depth", default=1*km)
-    
-
-    #------------------------------------------------------------------------
-    # green
-
-    fmin = pyre.dimensional("fmin", default=1*hertz)
-    fmax = pyre.dimensional("fmax", default=1*hertz)
-
-    nsamples = pyre.int("nsamples")
-
-    stations = pyre.str("stations")
-
-    #------------------------------------------------------------------------
-    # syndat
-    
-    plane = pyre.int("plane", default=0) # use CMT tensor components as-is
-    datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
-
-    #------------------------------------------------------------------------
-    # misc.
-
-    progressUrl = pyre.str("progress-url")
-
-
-    def main(self, *args, **kwds):
-        
-        self._info.activate()
-
-        enabledModes = []
-        for mode in modes:
-            if getattr(self, mode.id):
-                enabledModes.append(mode)
-
-        self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
-        if not enabledModes:
-            self._info.log("nothing to do")
-            return
-
-        jobdir = os.getcwd()
-        
-        archiveDirName = "mineos"
-        os.mkdir(archiveDirName)
-        inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
-        for inputFile in inputFiles:
-            os.rename(inputFile, os.path.join(archiveDirName, inputFile))
-        os.chdir(archiveDirName)
-        
-        self._info.log("running Mineos program suite for model '%s'" % self.model)
-        self.runMineos(self.model, enabledModes)
-
-        # archive the output
-        os.chdir(jobdir)
-        self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
-
-        # clean up
-        self.spawn("/bin/rm", "-rf", archiveDirName)
-
-        return
-
-
-    def runMineos(self, model, enabledModes):
-        mineos = Mineos(model, self.event, self.stations, self.progressUrl)
-
-        # minos_bran
-        for mode in enabledModes:
-            self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
-            # convert to millihertz
-            wgrav = self.wgrav / mHz
-            wmin = self.wmin / mHz
-            wmax = self.wmax / mHz
-            mineos.minos_bran(mode, self.eps, wgrav,
-                              self.lmin, self.lmax,
-                              wmin, wmax,
-                              self.nmin, self.nmax)
-
-        # eigcon
-        for mode in enabledModes:
-            self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
-            # convert to kilometers
-            max_depth = self.max_depth / km
-            mineos.eigcon(mode, max_depth)
-
-        # green
-        self._info.log("running program 'green'")
-        fmin = self.fmin / mHz
-        fmax = self.fmax / mHz
-        mineos.green(fmin, fmax, self.nsamples)
-
-        # creat_origin
-        self._info.log("running program 'creat_origin'")
-        mineos.creat_origin()
-
-        # syndat
-        self._info.log("running program 'syndat'")
-        mineos.syndat(self.plane, self.datatype)
-
-        # cucss2sac
-        os.symlink(mineos.green_out + ".origin", "seismograms.origin")
-        os.symlink(mineos.stations + ".site", "seismograms.site")
-        mineos.cucss2sac("seismograms", "output") # SAC
-        mineos.cucss2sac("-a", "seismograms", "output") # ASCII
-        
-        return
-
-
-    def spawn(self, *argv):
-        command = ' '.join(argv)
-        self._info.log("spawning: %s" % command)
-        status = os.spawnvp(os.P_WAIT, argv[0], argv)
-        statusMsg = "%s: exit %d" % (argv[0], status)
-        self._info.log(statusMsg)
-        if status != 0:
-            sys.exit("%s: %s" % (argv[0], statusMsg))
-        return
-
-
-def main(*args, **kwds):
-    if True:
-        # Redirect all output -- our journal output, and the output of
-        # our children -- to a file.
-        fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
-        os.dup2(fd, 1)
-        os.dup2(fd, 2)
-        os.close(fd)
-    
-    mineos = MineosScript()
-    mineos.run(*args, **kwds)
-
-
-if __name__ == "__main__":
-    main()
-
-
-# end of file

Modified: cs/portal/trunk/northridge/MANIFEST.in
===================================================================
--- cs/portal/trunk/northridge/MANIFEST.in	2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/northridge/MANIFEST.in	2008-07-02 00:46:55 UTC (rev 12366)
@@ -21,3 +21,7 @@
 include SeismoWebPortal/templates/registration/*.html
 include SeismoWebPortal/templates/registration/*.txt
 include SeismoWebPortal/templates/*.html
+include backend/daemon.py
+include backend/mineos.py
+include backend/web-portal-daemon.cfg
+include backend/specfem3D.py

Copied: cs/portal/trunk/northridge/backend/daemon.py (from rev 12359, cs/portal/trunk/daemon.py)
===================================================================
--- cs/portal/trunk/northridge/backend/daemon.py	                        (rev 0)
+++ cs/portal/trunk/northridge/backend/daemon.py	2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,835 @@
+
+import stackless
+import os, sys, signal
+from popen2 import Popen4
+
+from pyre.applications import Script
+from pyre.components import Component
+from pyre.units.time import second
+
+
+# NYI: RSL AST
+class RSLUnquoted(object):
+    def __init__(self, value):
+        self.value = value
+
+TG_CLUSTER_SCRATCH = "/work/teragrid/tg459131"
+TG_COMMUNITY = "/projects/tg"
+
+# I'm not sure what to do about this yet.
+TACC_ENVIRONMENT = """(environment=(TG_CLUSTER_SCRATCH "%s") (TG_COMMUNITY "%s") (PATH "/opt/lsf/bin:/opt/lsf/etc:/opt/MPI/intel9/mvapich-gen2/0.9.8/bin:/opt/apps/binutils/binutils-2.17/bin:/opt/intel/compiler9.1//idb/bin:/opt/intel/compiler9.1//cc/bin:/opt/intel/compiler9.1//fc/bin:/usr/local/first:/usr/local/bin:~/bin:.:/opt/apps/pki_apps:/opt/apps/gsi-openssh-3.9/bin:/opt/lsf/bin:/opt/lsf/etc:/sbin:/usr/sbin:/usr/local/sbin:/bin:/usr/bin:/usr/local/bin:/usr/X11R6/bin:/home/teragrid/tg459131/bin:/data/TG/srb-client-3.4.1-r1/bin:/data/TG/softenv-1.6.2-r3/bin:/data/TG/tg-policy/bin:/data/TG/gx-map-0.5.3.2-r1/bin:/data/TG/tgusage-2.9-r2/bin:/usr/java/j2sdk1.4.2_12/bin:/usr/java/j2sdk1.4.2_12/jre/bin:/data/TG/globus-4.0.1-r3/sbin:/data/TG/globus-4.0.1-r3/bin:/data/TG/tgcp-1.0.0-r2/bin:/data/TG/condor-6.7.18-r1/bin:/data/TG/condor-6.7.18-r1/sbin:/data/TG/hdf4-4.2r1-r1/bin:/opt/apps/hdf5/hdf5-1.6.5/bin:/data/TG/phdf5-1.6.5/bin") (MPICH_HOME "/opt/MPI/intel9/mvapich-gen2/0.9.8"))""" % (TG_CLUSTER_SCRATCH, TG_COMMUNITY)
+
+
+
+class GassServer(object):
+
+    # @classmethod
+    def startUp(cls, directory, info):
+        argv = ["globus-gass-server", "-r", "-w", "-c"]
+        savedWd = os.getcwd()
+        os.chdir(directory)
+        child = Popen4(argv)
+        os.chdir(savedWd)
+        child.tochild.close()
+        while True:
+            url = child.fromchild.readline().strip()
+            if url.startswith("https:"):
+                break
+        child.fromchild.close()
+        return GassServer(child, url, directory, info)
+    startUp = classmethod(startUp)
+
+    def __init__(self, child, url, directory, info):
+        self.child = child
+        self.url = url
+        self.directory = directory
+        self.info = info
+
+    def shutDown(self):
+        # This doesn't work reliably...
+        argv = ["globus-gass-server-shutdown", self.url]
+        command = ' '.join(argv)
+        self.info.log("spawning: %s" % command)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        statusMsg = "%s: exit %d" % (argv[0], status)
+        self.info.log(statusMsg)
+        if status != 0:
+            # ...so sometimes we kill it!
+            self.info.log("kill -INT %d" % self.child.pid)
+            os.kill(self.child.pid, signal.SIGINT)
+        self.info.log("waitpid(%d)" % self.child.pid)
+        status = self.child.wait()
+        return
+
+
+class JobManager(object):
+
+    def __init__(self, clock, root, info):
+        self.clock = clock
+        self.root = root
+        self.info = info
+
+
+    def runJob(self, job):
+        self.createSubdirectoryForJob(job)
+        stackless.tasklet(self.jobRunner)(job)
+
+    
+    def spawn(self, *argv):
+        command = ' '.join(argv)
+        self.info.log("spawning: %s" % command)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        statusMsg = "%s: exit %d" % (argv[0], status)
+        self.info.log(statusMsg)
+        if status != 0:
+            raise Exception(statusMsg)
+        return
+
+
+    def ospawn(self, *argv):
+        command = ' '.join(argv)
+        self.info.log("spawning: %s" % command)
+
+        child = Popen4(argv)
+
+        child.tochild.close()
+
+        output = child.fromchild.readlines()
+        status = child.wait()
+
+        exitStatus = None
+        if (os.WIFSIGNALED(status)):
+            statusStr = "signal %d" % os.WTERMSIG(status)
+        elif (os.WIFEXITED(status)):
+            exitStatus = os.WEXITSTATUS(status)
+            statusStr = "exit %d" % exitStatus
+        else:
+            statusStr = "status %d" % status
+        statusMsg = "%s: %s" % (argv[0], statusStr)
+    
+        for line in output:
+            self.info.line("    " + line.rstrip())
+        self.info.log(statusMsg)
+    
+        if exitStatus != 0:
+            raise Exception(statusMsg)
+
+        return output
+
+
+    def download(self, url, pathname):
+        import shutil
+        import urllib2
+        self.info.log("downloading %s" % url)
+        infile = urllib2.urlopen(url)
+        outfile = open(pathname, 'wb')
+        shutil.copyfileobj(infile,  outfile)
+        outfile.close()
+        infile.close()
+        return
+
+    def downloadInputFilesForJob(self, job):
+        for inputFile in job.inputFiles:
+            url = job.urlForInputFile(inputFile)
+            filename = inputFile.split('/')[-1] # strips 'mineos/'
+            pathname = os.path.join(job.directory, filename)
+            self.download(url, pathname)
+        return
+
+    def createSubdirectoryForJob(self, job):
+        while True:
+            dirName = randomName()
+            dirPath = os.path.join(self.root, dirName)
+            if not os.path.exists(dirPath):
+                os.mkdir(dirPath)
+                break
+        job.subdir = dirName
+        job.directory = dirPath
+        return
+
+
+class ForkJobManager(JobManager):
+
+    def jobRunner(self, job):
+
+        try:
+            self.downloadInputFilesForJob(job)
+
+            argv = self.argvForJob(job)
+            command = ' '.join(argv)
+            self.info.log("spawning: %s" % command)
+            savedWd = os.getcwd()
+            os.chdir(job.directory)
+            pid = os.spawnvp(os.P_NOWAIT, argv[0], argv)
+            os.chdir(savedWd)
+            self.info.log("spawned process %d" % pid)
+            
+            ticks = 2
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+            while wpid == 0:
+                for t in xrange(ticks):
+                    self.clock.tick.wait()
+                ticks = min(ticks * 2, 10)
+                wpid, status = os.waitpid(pid, os.WNOHANG)
+
+            self.uploadOutputFilesForJob(job)
+            
+            if (os.WIFSIGNALED(status)):
+                statusStr = "signal %d" % os.WTERMSIG(status)
+                job.setStatus(job.STATUS_FAILED)
+            elif (os.WIFEXITED(status)):
+                exitStatus = os.WEXITSTATUS(status)
+                statusStr = "exit %d" % exitStatus
+                if exitStatus == 0:
+                    job.setStatus(job.STATUS_DONE)
+                else:
+                    job.setStatus(job.STATUS_FAILED)
+            else:
+                statusStr = "status %d" % status
+                job.setStatus(job.STATUS_FAILED)
+            statusMsg = "%s: %s" % (argv[0], statusStr)
+            self.info.log(statusMsg)
+        
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            job.setStatus(job.STATUS_FAILED)
+
+        return
+
+
+    def argvForJob(self, job):
+        return ([job.resSpec['executable']] + job.resSpec['arguments'] +
+                ["--progress-url=%s" % job.progressUrl])
+
+
+    def uploadOutputFilesForJob(self, job):
+        return
+
+
+class RemoteShellJobManager(ForkJobManager):
+
+    def __init__(self, clock, root, info, config):
+        ForkJobManager.__init__(self, clock, root, info)
+        self.rsh = config.remoteShellCommand.split()
+        self.rdown = config.remoteDownloadCommand
+        self.rup = config.remoteUploadCommand
+        self.remoteOutputRoot = config.remoteOutputRoot
+
+
+    def downloadInputFilesForJob(self, job):
+        # First, download to the local host.
+        ForkJobManager.downloadInputFilesForJob(self, job)
+
+        # Create a remote directory for this run.
+        remoteDirectory = self.remoteDirectoryForJob(job)
+        argv = self.rsh + ["mkdir -p %s" % remoteDirectory]
+        self.spawn(*argv)
+        
+        # Now copy to the remote host.
+        for inputFile in job.inputFiles:
+            filename = inputFile.split('/')[-1] # strips 'mineos/'
+            pathname = os.path.join(job.directory, filename)
+            remotePathname = os.path.join(remoteDirectory, filename)
+            argv = (self.rdown % {'source': pathname, 'dest': remotePathname}).split()
+            self.spawn(*argv)
+            
+        return
+
+
+    def uploadOutputFilesForJob(self, job):
+        remoteDirectory = self.remoteDirectoryForJob(job)
+        for filename in job.outputFiles:
+            pathname = os.path.join(job.directory, filename)
+            remotePathname = os.path.join(remoteDirectory, filename)
+            argv = (self.rup % {'source': remotePathname, 'dest': pathname}).split()
+            try:
+                self.spawn(*argv)
+            except Exception:
+                pass
+        return
+
+
+    def argvForJob(self, job):
+        remoteCommand = ("cd " + self.remoteDirectoryForJob(job) + " && " +
+                         ' '.join(ForkJobManager.argvForJob(self, job)))
+        return self.rsh + [remoteCommand]
+
+
+    def remoteDirectoryForJob(self, job):
+        return os.path.join(self.remoteOutputRoot, "run%05d" % job.run.id)
+
+
+class GlobusJobManager(JobManager):
+
+    def jobRunner(self, job):
+
+        try:
+            self.downloadInputFilesForJob(job)
+            gassServer = GassServer.startUp(job.directory, self.info)
+        
+            resSpec = self.resSpec(job, gassServer)
+            id = self.globusrun(resSpec)
+
+            oldStatus = job.status
+            ticks = 2
+            while job.isAlive():
+                for t in xrange(ticks):
+                    self.clock.tick.wait()
+                ticks *= 2
+                status = self.getJobStatus(id)
+                if status != oldStatus:
+                    job.setStatus(status)
+                    oldStatus = status
+                    ticks = 2
+        
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            job.setStatus(job.STATUS_FAILED)
+
+        finally:
+            gassServer.shutDown()
+        
+        return
+
+
+    def globusrun(self, resSpec):
+        import tempfile
+        fd, rslName = tempfile.mkstemp(suffix=".rsl")
+        stream = os.fdopen(fd, "w")
+        try:
+            self.writeRsl(resSpec, stream)
+            stream.close()
+            if resSpec['jobType'] == "mpi":
+                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
+            else:
+                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-fork"
+            output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
+            id = None
+            for line in output:
+                if line.startswith("https://"):
+                    id = line.strip()
+            assert id
+        finally:
+            stream.close()
+            os.unlink(rslName)
+        return id
+
+
+    def writeRsl(self, resSpec, stream):
+        print >>stream, '&'
+        for relation in resSpec.iteritems():
+            attribute, valueSequence = relation
+            valueSequence = self.rslValueSequenceToString(valueSequence)
+            print >>stream, '(', attribute, '=', valueSequence, ')'
+        print >>stream, TACC_ENVIRONMENT
+        return
+
+
+    def rslValueSequenceToString(self, valueSequence):
+        if not isinstance(valueSequence, (tuple, list)):
+            valueSequence = [valueSequence]
+        s = []
+        for value in valueSequence:
+            if isinstance(value, RSLUnquoted):
+                s.append(value.value)
+            elif isinstance(value, (tuple, list)):
+                s.append('(' + self.rslValueSequenceToString(value) + ')')
+            else:
+                s.append('"%s"' % value)
+        return ' '.join(s)
+
+
+    def getJobStatus(self, id):
+        output = self.ospawn("globus-job-status", id)
+        status = output[0].strip()
+        return status
+
+
+    def resSpec(self, job, gassServer):
+        resSpec = {}
+        resSpec.update(job.resSpec)
+        resSpec["scratch_dir"] = TG_CLUSTER_SCRATCH
+        resSpec["directory"] = RSLUnquoted("$(SCRATCH_DIRECTORY)")
+        
+        file_stage_in = []
+        for inputFile in job.inputFiles:
+            url = gassServer.url + "/./" + inputFile
+            file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
+        resSpec["file_stage_in"] = file_stage_in
+        
+        file_stage_out = []
+        for outputFile in job.outputFiles:
+            url = gassServer.url + "/./" + outputFile
+            file_stage_out.append([outputFile, url])
+        if file_stage_out:
+            resSpec["file_stage_out"] = file_stage_out
+        
+        resSpec["stdout"] = gassServer.url + "/./stdout.txt"
+        resSpec["stderr"] = gassServer.url + "/./stderr.txt"
+
+        job.outputFiles.extend(["stdout.txt", "stderr.txt"])
+        
+        return resSpec
+
+
+class Event(object):
+    
+    def __init__(self):
+        self.channel = stackless.channel()
+
+    def wait(self):
+        self.channel.receive()
+        return
+
+    def signal(self):
+
+        # Swap-in a new channel, so that a waiting tasklet that
+        # repeatedly calls wait() will block waiting for the *next*
+        # signal on its subsequent call to wait().
+        channel = self.channel
+        self.channel = stackless.channel()
+
+        # Unblock all waiters.
+        while channel.balance < 0:
+            channel.send(None)
+        
+        return
+
+
+class Job(object):
+
+    # status codes
+    STATUS_NEW      = "NEW" # pseudo
+    STATUS_PENDING  = "PENDING"
+    STATUS_ACTIVE   = "ACTIVE"
+    STATUS_DONE     = "DONE"
+    STATUS_FAILED   = "FAILED"
+    statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
+    deadCodes = [STATUS_DONE, STATUS_FAILED]
+
+    def __init__(self, run, task, **resSpec):
+        self.run = run
+        self.task = task
+        self.resSpec = resSpec
+        self.status = self.STATUS_NEW
+        self.statusChanged = Event()
+        self.inputFiles = []
+        self.urlForInputFile = None
+        self.outputFiles = []
+        self.subdir = None
+        self.directory = None
+
+    def setStatus(self, status):
+        if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
+            assert status in self.statusCodes, "unknown status: %s" % status
+        self.status = status
+        self.statusChanged.signal()
+
+    def isAlive(self):
+        return not self.status in self.deadCodes
+
+
+class Run(object):
+    
+    # status codes
+    STATUS_NEW        = "ready"
+    STATUS_CONNECTING = "connecting"
+    STATUS_PREPARING  = "preparing"  # reported by build & schedule process
+    STATUS_PENDING    = "pending"    # reported by build & schedule process
+    STATUS_MESHING    = "meshing"    # reported by launcher process
+    STATUS_SOLVING    = "solving"    # reported by launcher process
+    STATUS_FINISHING  = "finishing"  # reported by launcher process
+    STATUS_DONE       = "done"
+    STATUS_ERROR      = "error"
+    deadCodes = [STATUS_DONE, STATUS_ERROR]
+
+    def __init__(self, id, urlForInputFile, config, info, jm):
+        self.id = id
+        self.urlForInputFile = urlForInputFile
+        self.specfemPathname = config.specfemPathname
+        self.mineosPathname = config.mineosPathname
+        self.dry = config.dry
+        self.info = info
+        self.jm = jm
+
+        self.jobChannel = stackless.channel()
+        self.status = self.STATUS_NEW
+        self.statusChanged = Event()
+
+    def go(self):
+        stackless.tasklet(self)()
+
+    def __call__(self):
+        try:
+            self.setStatus(self.STATUS_CONNECTING)
+            
+            # run
+            job = self.newJob()
+            self.jm.runJob(job)
+
+            # send jobs to jobSink()
+            self.jobChannel.send(job)
+            self.jobChannel.send(None) # no more jobs
+
+            self.setStatus(self.STATUS_PREPARING)
+
+            while job.isAlive():
+                job.statusChanged.wait()
+            # while
+
+            if job.status == job.STATUS_FAILED:
+                self.setStatus(self.STATUS_ERROR)
+                raise RuntimeError("run failed")
+
+            self.setStatus(self.STATUS_DONE)
+
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            self.setStatus(self.STATUS_ERROR)
+        
+        return
+
+    def setStatus(self, status):
+        self.status = status
+        self.statusChanged.signal()
+
+    def isAlive(self):
+        return not self.status in self.deadCodes
+
+
+class SpecfemRun(Run):
+
+    def newJob(self):
+        dry = []
+        if self.dry:
+            dry = ["--scheduler.dry"]
+        parameters = 'par_file.txt'
+        event = 'event.txt'
+        stations = 'stations.txt'
+        job = Job(
+            self,
+            "run",
+            jobType = "single",
+            count = 1,
+            executable = self.specfemPathname,
+            arguments = ['--par-file=' + parameters,
+                         '--cmt-solution=' + event,
+                         '--stations=' + stations,
+                         "--scheduler.wait=True",
+                         "--job.name=run%05d" % self.id,
+                         "--macros.run.id=%05d" % self.id,
+                         #"--job.walltime=2*hour",
+                         "--job.stdout=stdout.txt",
+                         "--job.stderr=stderr.txt",
+                         ] + dry,
+            )
+        job.urlForInputFile = self.urlForInputFile
+        job.inputFiles = [parameters, event, stations]
+        job.outputFiles = ["specfem3dglobe.tar.gz", "output_mesher.txt", "output_solver.txt", "output_build.txt"]
+        return job
+
+
+class MineosRun(Run):
+
+    def newJob(self):
+        job = Job(
+            self,
+            "run",
+            executable = self.mineosPathname,
+            arguments = ["parameters.pml"],
+            )
+        job.urlForInputFile = self.urlForInputFile
+        job.inputFiles = ["parameters.pml", "event.txt", "stations.site", "stations.sitechan", "model.txt"]
+        job.outputFiles = ["output_mineos.txt", "mineos.tar.gz"]
+        return job
+
+
+class PortalConnection(object):
+
+    MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+    def __init__(self, portal, outputRootUrl, clock, info):
+        self.portal = portal
+        self.outputRootUrl = outputRootUrl
+        self.clock = clock
+        self.info = info
+
+    def runSimulations(self, gjm, fjm, config):
+        stackless.tasklet(self.runFactory)(gjm, fjm, config)
+
+    def runFactory(self, gjm, fjm, config):
+        import urllib2
+        
+        runs = {}
+        
+        while True:
+            self.clock.tick.wait()
+            
+            #self.info.log("GET %s" % self.portal.runsUrl)
+            try:
+                infile = urllib2.urlopen(self.portal.runsUrl)
+            except Exception, e:
+                # Could be transient failure -- e.g., "connection reset by peer".
+                self.info.log("error: %s" % e)
+                continue
+            #self.info.log("OK")
+            runList = infile.read()
+            infile.close()
+            
+            runList = eval(runList)
+            
+            for run in runList:
+                id = int(run['id'])
+                status = run['status']
+                code = run['code']
+                if (status == Run.STATUS_NEW and not runs.has_key(id)):
+                    self.info.log("new run %d" % id)
+
+                    urlForInputFile = URLForInputFile(self.portal.inputFileUrl, id)
+
+                    if code == 1:
+                        newRun = SpecfemRun(id, urlForInputFile, config, self.info, gjm)
+                    elif code == 2:
+                        newRun = MineosRun(id, urlForInputFile, config, self.info, fjm)
+                    else:
+                        self.info.log("unknown code %d" % code)
+                        continue
+                    
+                    self.watchRun(newRun)
+                    runs[id] = newRun
+                    newRun.go()
+        
+        return
+
+    def watchRun(self, run):
+        stackless.tasklet(self.runWatcher)(run)
+        stackless.tasklet(self.jobSink)(run)
+        
+    def runWatcher(self, run):
+        url = self.portal.runStatusUrl % run.id
+        while run.isAlive():
+            run.statusChanged.wait()
+            fields = {'status': run.status}
+            self.postStatusChange(url, fields)
+        return
+
+    def jobSink(self, run):
+        newJob = run.jobChannel.receive()
+        while newJob is not None:
+            self.watchJob(newJob, run)
+            newJob = run.jobChannel.receive()
+        return
+
+    def watchJob(self, job, run):
+        url = self.portal.jobCreateUrl
+        fields = self.jobFields(job, run)
+        response = self.postStatusChange(url, fields)
+        portalId = eval(response)
+        job.progressUrl = self.portal.jobProgressUrl % portalId
+        stackless.tasklet(self.jobWatcher)(job, portalId, run)
+        
+    def jobWatcher(self, job, portalId, run):
+        url = self.portal.jobUpdateUrl % portalId
+        while job.isAlive():
+            job.statusChanged.wait()
+            fields = self.jobFields(job, run)
+            self.postStatusChange(url, fields)
+        self.postJobOutput(job, portalId)
+        return
+
+    def postJobOutput(self, job, portalId):
+        url = self.portal.outputCreateUrl
+        
+        for outputFile in job.outputFiles:
+            
+            pathname = os.path.join(job.directory, outputFile)
+            if not os.path.exists(pathname):
+                continue
+            
+            # in case the daemon and the web server are run as
+            # different users
+            import stat
+            os.chmod(pathname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+            
+            fields = {
+                'job': portalId,
+                'name': outputFile,
+                }
+            self.postStatusChange(url, fields)
+        
+        return
+
+    def jobFields(self, job, run):
+        fields = {
+            'run': run.id,
+            'task': job.task,
+            'status': job.status.lower(),
+            'url': self.outputRootUrl + job.subdir + "/",
+            }
+        return fields
+
+    def postStatusChange(self, url, fields):
+        import urllib
+        body = urllib.urlencode(fields)
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        return self.post(body, headers, url)
+
+    def postStatusChangeAndUploadOutput(self, url, fields, output):
+        # Based upon a recipe by Wade Leftwich.
+        files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
+        body = self.encodeMultipartFormData(fields, files)
+        headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
+                   "Content-Length": str(len(body)),
+                   "Accept": "text/plain"}
+        return self.post(body, headers, url)
+
+    def encodeMultipartFormData(self, fields, files):
+        import shutil
+        from StringIO import StringIO
+        stream = StringIO()
+        def line(s=''): stream.write(s + '\r\n')
+        for key, value in fields.iteritems():
+            line('--' + self.MULTIPART_BOUNDARY)
+            line('Content-Disposition: form-data; name="%s"' % key)
+            line()
+            line(value)
+        for (key, filename, contentType, contentEncoding, content) in files:
+            line('--' + self.MULTIPART_BOUNDARY)
+            line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
+            line('Content-Type: %s' % contentType)
+            line('Content-Encoding: %s' % contentEncoding)
+            line()
+            shutil.copyfileobj(content, stream)
+            line()
+        line('--' + self.MULTIPART_BOUNDARY + '--')
+        line()
+        return stream.getvalue()
+
+    def post(self, body, headers, url):
+        self.info.log("POST %s" % url)
+        import httplib
+        if self.portal.scheme == "http":
+            conn = httplib.HTTPConnection(self.portal.host)
+        elif self.portal.scheme == "https":
+            conn = httplib.HTTPSConnection(self.portal.host)
+        else:
+            assert False # not scheme in ["http", "https"]
+        conn.request("POST", url, body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        self.info.log("response %s" % data)
+        return data
+
+
+class URLForInputFile(object):
+    def __init__(self, inputFileUrl, id):
+        self.inputFileUrl = inputFileUrl
+        self.id = id
+    def __call__(self, inputFile):
+        # Map input filenames to URLs in the context of this run.
+        return self.inputFileUrl % (self.id, inputFile)
+
+
+
+class Clock(object):
+
+    def __init__(self, interval):
+        self.interval = interval
+        self.tick = Event()
+        stackless.tasklet(self)()
+
+    def __call__(self):
+        from time import sleep
+        while True:
+            sleep(self.interval)
+            self.tick.signal()
+            stackless.schedule()
+        return
+
+
+class WebPortal(Component):
+    
+    name = "web-portal"
+
+    import pyre.inventory as pyre
+    scheme   = pyre.str("scheme", validator=pyre.choice(["http", "https"]), default="http")
+    host     = pyre.str("host", default="localhost:8000")
+    urlRoot  = pyre.str("url-root", default="/portals/seismo/")
+
+    def _configure(self):
+        self.urlPrefix           = '%s://%s%s' % (self.scheme, self.host, self.urlRoot)
+        self.inputFileUrl        = self.urlPrefix + 'runs/%d/%s'
+        # runs
+        self.runsUrl             = self.urlPrefix + 'runs/list.py'
+        self.runStatusUrl        = self.urlRoot + 'runs/%d/status/'
+        # jobs
+        self.jobCreateUrl        = self.urlRoot + 'jobs/create/'
+        self.jobUpdateUrl        = self.urlRoot + 'jobs/%d/update/'
+        self.jobProgressUrl      = self.urlPrefix + 'jobs/%d/progress/'
+        # output
+        self.outputCreateUrl     = self.urlRoot + 'output/create/'
+
+
+class Daemon(Script):
+
+    name = "web-portal-daemon"
+
+    import pyre.inventory as pyre
+    portal = pyre.facility("portal", factory=WebPortal)
+    sleepInterval = pyre.dimensional("sleep-interval", default=60*second)
+
+    outputRootPathname = pyre.str("output-root-pathname")
+    outputRootUrl = pyre.str("output-root-url")
+
+    specfemPathname = pyre.str("specfem-pathname", default="specfem3D.py")
+    mineosPathname = pyre.str("mineos-pathname", default="mineos.py")
+
+    remote = pyre.bool("remote", default=False)
+    remoteShellCommand = pyre.str("remote-shell-command")
+    remoteDownloadCommand = pyre.str("remote-download-command")
+    remoteUploadCommand = pyre.str("remote-upload-command")
+    remoteOutputRoot = pyre.str("remote-output-root")
+
+    dry = pyre.bool("dry", default=False)
+
+
+    def main(self, *args, **kwds):
+        self._info.activate()
+        self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
+        clock = Clock(self.sleepInterval / second)
+        if self.remote:
+            gjm = RemoteShellJobManager(clock, self.outputRootPathname, self._info, self)
+        else:
+            gjm = GlobusJobManager(clock, self.outputRootPathname, self._info)
+        fjm = ForkJobManager(clock, self.outputRootPathname, self._info)
+        connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
+        connection.runSimulations(gjm, fjm, self)
+        try:
+            stackless.run()
+        except KeyboardInterrupt:
+            self._info.log("~~~~~~~~~~ daemon stopped ~~~~~~~~~~")
+        return
+
+
+def randomName():
+    # Stolen from pyre.services.Pickler; perhaps this should be an
+    # official/"exported" Pyre function?
+    
+    alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
+
+    import random
+    random.shuffle(alphabet)
+    key = "".join(alphabet)[0:16]
+
+    return key
+        
+
+def main(*args, **kwds):
+    daemon = Daemon()
+    daemon.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+    main()

Copied: cs/portal/trunk/northridge/backend/mineos.py (from rev 12359, cs/portal/trunk/mineos.py)
===================================================================
--- cs/portal/trunk/northridge/backend/mineos.py	                        (rev 0)
+++ cs/portal/trunk/northridge/backend/mineos.py	2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,404 @@
+#!/usr/bin/env python
+
+
+from pyre.applications import Script
+from pyre.units.SI import milli, hertz
+from pyre.units.length import km
+import os, sys, stat, popen2, time
+from os.path import basename, dirname, splitext
+
+mHz = milli*hertz
+
+# Assume this script is installed alongside the mineos binaries.
+prefix = dirname(dirname(__file__))
+
+
+mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
+
+
+class Mode(object):
+    def __init__(self, jcom, id, desc):
+        self.jcom = jcom
+        self.id = id
+        self.desc = desc
+
+
+modes = [Mode(jcom, id, desc) for jcom, id, desc in [
+    (1, 'radial', 'radial'),
+    (2, 'toroidal', 'toroidal'),
+    (3, 'spheroidal', 'spheroidal'),
+    (4, 'ictoroidal', 'inner core toroidal'),
+    ]]
+del jcom, id, desc
+
+
+class Mineos(object):
+
+    def __init__(self, model, event, stations, progressUrl):
+        self.bin = prefix + "/bin"
+        self.model = model
+        self.event = event
+        self.stations = stations
+        self.progressUrl = progressUrl
+        self.eigcon_out = []
+        self.green_out = "green"
+        return
+
+
+    def minos_bran(self,
+                   mode,
+                   eps, wgrav,
+                   lmin, lmax,
+                   wmin, wmax,
+                   nmin, nmax):
+        if mode.id == 'radial':
+            # These are ignored.
+            lmin = 0
+            lmax = 0
+            # Misha Barmin says that that there are no dispersion
+            # branches for radial modes; thus nmin, nmax have a
+            # different sense in this context.  He says to fix them as
+            # follows.
+            nmin = 0
+            nmax = 8000
+        minos_bran_in = mode.id + "_mineos_bran.in"
+        s = open(minos_bran_in, "w")
+        print >>s, self.model
+        print >>s, self.listing(mode)
+        print >>s, self.eigenfunctions(mode)
+        print >>s, eps, wgrav
+        print >>s, mode.jcom
+        print >>s, lmin, lmax, wmin, wmax, nmin, nmax
+        s.close()
+        self.run("minos_bran", stdin=minos_bran_in)
+        return
+
+
+    def eigcon(self, mode, max_depth):
+        eigcon_in = mode.id + "_eigcon.in"
+        s = open(eigcon_in, "w")
+        print >>s, mode.jcom
+        print >>s, self.model
+        print >>s, max_depth
+        print >>s, self.listing(mode)
+        print >>s, self.eigenfunctions(mode)
+        dbname = mode.id #+ "_eigcon_out"
+        print >>s, dbname
+        s.close()
+        self.run("eigcon", stdin=eigcon_in)
+        self.eigcon_out.append(dbname)
+        return
+
+
+    def green(self, fmin, fmax, nsamples):
+        nStations = self.lineCount(self.stations + ".site")
+        s = open("db_list", "w")
+        for dbname in self.eigcon_out:
+            print >>s, dbname
+        green_in = open("green.in", "w")
+        argv = [os.path.join(self.bin, "green")]
+        child = popen2.Popen4(argv)
+        for s in [green_in, child.tochild]:
+            print >>s, self.stations
+            print >>s, "db_list"
+            print >>s, self.event
+            print >>s, fmin, fmax
+            print >>s, nsamples
+            print >>s, self.green_out
+            s.close()
+        stationTally = 0
+        self.postProgress(0.0)
+        lastPostTime = time.time()
+        for line in child.fromchild:
+            print line,
+            if line.startswith(" green: Station: "):
+                stationTally += 1
+                now = time.time()
+                if now - lastPostTime >= 60.0:
+                    self.postProgress(float(stationTally) / float(nStations))
+                    lastPostTime = now
+        status = child.wait()
+        self.checkStatus(status, argv)
+        return
+
+
+    def creat_origin(self):
+        self.run("creat_origin", [self.event, self.green_out])
+
+
+    def syndat(self, plane, datatype):
+        syndat_in = "syndat.in"
+        s = open(syndat_in, "w")
+        print >>s, self.event
+        print >>s, plane # error in documentation!
+        print >>s, self.green_out
+        print >>s, "seismograms" #"syndat_out"
+        print >>s, datatype
+        s.close()
+        self.run("syndat", stdin=syndat_in)
+        return
+
+
+    def cucss2sac(self, *args):
+        self.run("cucss2sac", list(args))
+        return
+
+
+    def listing(self, mode):
+        return mode.id + "_listing"
+
+
+    def eigenfunctions(self, mode):
+        return mode.id + "_eigenfunctions"
+
+
+    def run(self, program, args=None, stdin=None, stdout=None, stderr=None):
+        sys.stdout.flush()
+        sys.stderr.flush()
+        if args is None:
+            args = []
+        argv = [program] + args
+        pid = os.fork()
+        if pid:
+            # parent
+            _, status = os.wait()
+            self.checkStatus(status, argv)
+        else:
+            # child
+            if stdin is not None:
+                fd = os.open(stdin, os.O_RDONLY)
+                os.dup2(fd, 0)
+            if stdout is not None:
+                fd = os.open(stdout, os.O_WRONLY, mode)
+                os.dup2(fd, 1)
+            if stderr is not None:
+                fd = os.open(stderr, os.O_WRONLY, mode)
+                os.dup2(fd, 2)
+            executable = os.path.join(self.bin, argv[0])
+            try:
+                os.execvp(executable, argv)
+            except Exception, e:
+                sys.exit("%s: %s" % (executable, e))
+            
+        return
+
+
+    def checkStatus(self, status, argv):
+        exitStatus = None
+        if (os.WIFSIGNALED(status)):
+            statusStr = "signal %d" % os.WTERMSIG(status)
+        elif (os.WIFEXITED(status)):
+            exitStatus = os.WEXITSTATUS(status)
+            statusStr = "exit %d" % exitStatus
+        else:
+            statusStr = "status %d" % status
+        statusMsg = "%s: %s" % (argv[0], statusStr)
+        if exitStatus != 0:
+            sys.exit(statusMsg)
+        return
+
+
+    def lineCount(self, filename):
+        tally = 0
+        s = open(filename, 'r')
+        for line in s:
+            tally += 1
+        return tally
+
+
+    def postProgress(self, progress):
+        import urllib, urlparse
+        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+        fields = { 'progress': "%.2f" % progress }
+        body = urllib.urlencode(fields)
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        import httplib
+        if scheme == "http":
+            conn = httplib.HTTPConnection(host)
+        elif scheme == "https":
+            conn = httplib.HTTPSConnection(host)
+        else:
+            assert False # not scheme in ["http", "https"]
+        conn.request("POST", path, body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        return data
+
+
+
+class MineosScript(Script):
+
+    name = "mineos"
+
+    import pyre.inventory as pyre
+
+    #------------------------------------------------------------------------
+    # common
+    
+    model = pyre.str("model")
+
+    # jcom
+    radial = pyre.bool("radial")
+    toroidal = pyre.bool("toroidal")
+    spheroidal = pyre.bool("spheroidal")
+    ictoroidal = pyre.bool("inner-core-toroidal")
+
+    event = pyre.str("event")
+
+    
+    #------------------------------------------------------------------------
+    # minos_bran
+    
+    eps = pyre.float("eps")
+    wgrav = pyre.dimensional("wgrav", default=1*hertz)
+
+    lmin = pyre.int("lmin")
+    lmax = pyre.int("lmax")
+    
+    wmin = pyre.dimensional("wmin", default=1*hertz)
+    wmax = pyre.dimensional("wmax", default=1*hertz)
+    
+    nmin = pyre.int("nmin")
+    nmax = pyre.int("nmax")
+
+
+    #------------------------------------------------------------------------
+    # eigcon
+
+    max_depth = pyre.dimensional("max-depth", default=1*km)
+    
+
+    #------------------------------------------------------------------------
+    # green
+
+    fmin = pyre.dimensional("fmin", default=1*hertz)
+    fmax = pyre.dimensional("fmax", default=1*hertz)
+
+    nsamples = pyre.int("nsamples")
+
+    stations = pyre.str("stations")
+
+    #------------------------------------------------------------------------
+    # syndat
+    
+    plane = pyre.int("plane", default=0) # use CMT tensor components as-is
+    datatype = pyre.int("datatype", default=0) # accelerograms in nm/s^2
+
+    #------------------------------------------------------------------------
+    # misc.
+
+    progressUrl = pyre.str("progress-url")
+
+
+    def main(self, *args, **kwds):
+        
+        self._info.activate()
+
+        enabledModes = []
+        for mode in modes:
+            if getattr(self, mode.id):
+                enabledModes.append(mode)
+
+        self._info.log("modes: " + ' '.join(["'%s'" % mode.desc for mode in enabledModes]))
+        if not enabledModes:
+            self._info.log("nothing to do")
+            return
+
+        jobdir = os.getcwd()
+        
+        archiveDirName = "mineos"
+        os.mkdir(archiveDirName)
+        inputFiles = [self.model, self.event] + [self.stations + suffix for suffix in [".site", ".sitechan"]]
+        for inputFile in inputFiles:
+            os.rename(inputFile, os.path.join(archiveDirName, inputFile))
+        os.chdir(archiveDirName)
+        
+        self._info.log("running Mineos program suite for model '%s'" % self.model)
+        self.runMineos(self.model, enabledModes)
+
+        # archive the output
+        os.chdir(jobdir)
+        self.spawn("/bin/tar", "cvzf", archiveDirName + ".tar.gz", archiveDirName)
+
+        # clean up
+        self.spawn("/bin/rm", "-rf", archiveDirName)
+
+        return
+
+
+    def runMineos(self, model, enabledModes):
+        mineos = Mineos(model, self.event, self.stations, self.progressUrl)
+
+        # minos_bran
+        for mode in enabledModes:
+            self._info.log("running program 'minos_bran' for '%s' mode" % mode.desc)
+            # convert to millihertz
+            wgrav = self.wgrav / mHz
+            wmin = self.wmin / mHz
+            wmax = self.wmax / mHz
+            mineos.minos_bran(mode, self.eps, wgrav,
+                              self.lmin, self.lmax,
+                              wmin, wmax,
+                              self.nmin, self.nmax)
+
+        # eigcon
+        for mode in enabledModes:
+            self._info.log("running program 'eigcon' for '%s' mode" % mode.desc)
+            # convert to kilometers
+            max_depth = self.max_depth / km
+            mineos.eigcon(mode, max_depth)
+
+        # green
+        self._info.log("running program 'green'")
+        fmin = self.fmin / mHz
+        fmax = self.fmax / mHz
+        mineos.green(fmin, fmax, self.nsamples)
+
+        # creat_origin
+        self._info.log("running program 'creat_origin'")
+        mineos.creat_origin()
+
+        # syndat
+        self._info.log("running program 'syndat'")
+        mineos.syndat(self.plane, self.datatype)
+
+        # cucss2sac
+        os.symlink(mineos.green_out + ".origin", "seismograms.origin")
+        os.symlink(mineos.stations + ".site", "seismograms.site")
+        mineos.cucss2sac("seismograms", "output") # SAC
+        mineos.cucss2sac("-a", "seismograms", "output") # ASCII
+        
+        return
+
+
+    def spawn(self, *argv):
+        command = ' '.join(argv)
+        self._info.log("spawning: %s" % command)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        statusMsg = "%s: exit %d" % (argv[0], status)
+        self._info.log(statusMsg)
+        if status != 0:
+            sys.exit("%s: %s" % (argv[0], statusMsg))
+        return
+
+
+def main(*args, **kwds):
+    if True:
+        # Redirect all output -- our journal output, and the output of
+        # our children -- to a file.
+        fd = os.open("output_mineos.txt", os.O_CREAT | os.O_WRONLY, mode)
+        os.dup2(fd, 1)
+        os.dup2(fd, 2)
+        os.close(fd)
+    
+    mineos = MineosScript()
+    mineos.run(*args, **kwds)
+
+
+if __name__ == "__main__":
+    main()
+
+
+# end of file

Copied: cs/portal/trunk/northridge/backend/specfem3D.py (from rev 12365, seismo/3D/SPECFEM3D_GLOBE/trunk/pyspecfem3D)
===================================================================
--- cs/portal/trunk/northridge/backend/specfem3D.py	                        (rev 0)
+++ cs/portal/trunk/northridge/backend/specfem3D.py	2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,310 @@
+#!/usr/bin/env python
+
+
+from pyre.applications import Script
+from os.path import dirname, exists, isdir, join
+import os, sys, stat, shutil, tarfile
+from time import sleep
+
+
+SPECFEM3D_GLOBE = dirname(__file__)
+
+
+class Specfem3DGlobe(Script):
+
+
+    name = "Specfem3DGlobe"
+
+
+    import pyre.inventory
+    import pyre.schedulers
+    import pyre.launchers
+    
+    parFile        = pyre.inventory.str("par-file")
+    cmtSolution    = pyre.inventory.str("cmt-solution")
+    stations       = pyre.inventory.str("stations")
+    configureArgs  = pyre.inventory.str("configure-args", default="FC=mpif90")
+
+    scratchDir     = pyre.inventory.str("scratch-dir", default="/scratch")
+
+    nodes          = pyre.inventory.int("nodes", default=1)
+    scheduler      = pyre.schedulers.scheduler("scheduler", default="none")
+    job            = pyre.schedulers.job("job")
+    launchCommand  = pyre.inventory.str("launch-command", default="mpirun -np %(nodes)s")
+    context        = pyre.inventory.str(
+        name="context", default="login", validator=pyre.inventory.choice(["login", "launcher", "pre-compute", "post-compute"]))
+    progressUrl = pyre.inventory.str("progress-url")
+
+
+
+    def main(self, *args, **kwds):
+        context = self.context
+        if context == "login":
+            self.onLoginNode(*args, **kwds)
+        elif context == "launcher":
+            self.onLauncherNode(*args, **kwds)
+        elif context == "pre-compute":
+            self.onPreCompute(*args, **kwds)
+        elif context == "post-compute":
+            self.onPostCompute(*args, **kwds)
+        return
+
+
+    def onLoginNode(self, *args, **kwds):
+        # Redirect all output -- our journal output, and the output of
+        # our children -- to a file.
+        fd = os.open("output_build.txt", os.O_CREAT | os.O_WRONLY, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
+        os.dup2(fd, 1)
+        os.dup2(fd, 2)
+        os.close(fd)
+
+        self.prepareFiles()
+        self.build()
+        pid = os.fork()
+        if pid:
+            self.monitorProgress(pid)
+        else:
+            self.schedule(*args, **kwds)
+            self.processOutputFiles()
+        return
+
+
+    def onLauncherNode(self, *args, **kwds):
+        
+        launchCommand = self.launchCommand % {'nodes': self.nodes}
+        launchCommand = launchCommand.split()
+
+        wd = os.getcwd()
+        myArgv = self.getArgv(*args, **kwds)
+
+        # launch ourselves
+        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=pre-compute", "--nodes=%d" % self.nodes]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        # launch the mesher
+        argv = launchCommand + [join(wd, "xmeshfem3D")]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        # launch the solver
+        argv = launchCommand + [join(wd, "xspecfem3D")]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        # launch ourselves
+        argv = launchCommand + [sys.executable, __file__] + myArgv + ["--context=post-compute", "--nodes=%d" % self.nodes]
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        return
+
+
+    def onPreCompute(self, *args, **kwds):
+        makedirs(self.scratchDir)
+
+    def onPostCompute(self, *args, **kwds):
+        try:
+            files = os.listdir(self.scratchDir)
+            for file in files:
+                try:
+                    os.remove(join(self.scratchDir, file))
+                except OSError:
+                    pass
+        except OSError:
+            pass
+
+        try:
+            os.rmdir(self.scratchDir)
+        except OSError:
+            pass
+
+        return
+
+
+    def prepareFiles(self):
+        
+        self.mkdir("DATA")
+        self.mkdir("OUTPUT_FILES")
+        
+        self.readAndCopyParFile()
+
+        shutil.copyfile(self.cmtSolution, "DATA/CMTSOLUTION")
+        shutil.copyfile(self.stations, "DATA/STATIONS")
+
+        dataSrc = join(SPECFEM3D_GLOBE, "DATA")
+        for name in os.listdir(dataSrc):
+            src = join(dataSrc, name)
+            dest = join("DATA", name)
+            if exists(dest):
+                continue
+            os.symlink(src, dest)
+        
+        return
+
+
+    def readAndCopyParFile(self):
+        s = open(self.parFile, "r")
+        d = open("DATA/Par_file", "w")
+        for line in s:
+            tokens = line.split()
+            if len(tokens) >= 3:
+                var = tokens[0]
+                if var == "NCHUNKS":
+                    NCHUNKS = int(tokens[2])
+                elif var == "NPROC_XI":
+                    NPROC_XI = int(tokens[2])
+                elif var == "NPROC_ETA":
+                    NPROC_ETA = int(tokens[2])
+                elif var == "LOCAL_PATH":
+                    print >>d, "LOCAL_PATH =", self.scratchDir
+                    continue
+            print >>d, line,
+        self.nodes = NCHUNKS * NPROC_XI * NPROC_ETA
+        return
+
+
+    def build(self):
+        
+        # configure
+        configure = join(SPECFEM3D_GLOBE, "configure")
+        configureArgs = self.configureArgs.split()
+        argv = [configure] + configureArgs
+        print ' '.join(argv)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        # make
+        self.mkdir("obj")
+        argv = ['make']
+        print ' '.join(argv)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        if status != 0:
+            sys.exit("%s: exit %d" % (argv[0], status))
+
+        return
+
+
+    def schedule(self, *args, **kwds):
+        argv = self.getArgv(*args, **kwds)
+        
+        # initialize the job
+        job = self.job
+        job.nodes = self.nodes
+        job.executable = sys.executable
+        job.arguments = [__file__] + argv + ["--context=launcher", "--nodes=%d" % self.nodes]
+
+        # schedule
+        self.scheduler.schedule(job)
+
+        return
+
+
+    def mkdir(self, name):
+        if not isdir(name):
+            os.mkdir(name)
+        return
+
+
+    def processOutputFiles(self):
+
+        archiveOut = open("specfem3dglobe.tar.gz", 'w')
+        tgzOut = tarfile.open(archiveOut.name, 'w:gz', archiveOut)
+
+        for name in os.listdir("OUTPUT_FILES"):
+            pathname = join("OUTPUT_FILES", name)
+            arcname = pathname
+            tgzOut.add(pathname, arcname)
+
+        tgzOut.close()
+        archiveOut.close()
+
+        for filename in ["output_mesher.txt", "output_solver.txt"]:
+            pathname = join("OUTPUT_FILES", filename)
+            if exists(pathname):
+                shutil.copyfile(pathname, filename)
+
+        return
+
+
+    def monitorProgress(self, pid):
+        progress = None
+        wpid, status = os.waitpid(pid, os.WNOHANG)
+        while wpid == 0:
+            newProgress = self.checkProgress()
+            if newProgress != progress:
+                progress = newProgress
+                self.postProgress(progress)
+            sleep(60)
+            wpid, status = os.waitpid(pid, os.WNOHANG)
+        return
+
+
+    def checkProgress(self):
+        try:
+            s = open("OUTPUT_FILES/output_solver.txt", 'r')
+        except IOError:
+            return None
+        progress = 0.0
+        for line in s:
+            if line.startswith(" We have done"):
+                progress = float(line.split()[3]) / 100.0
+        return progress
+
+
+    def postProgress(self, progress):
+        import urllib, urlparse
+        scheme, host, path = urlparse.urlparse(self.progressUrl)[0:3]
+        fields = { 'progress': "%.2f" % progress }
+        body = urllib.urlencode(fields)
+        headers = {"Content-Type": "application/x-www-form-urlencoded",
+                   "Accept": "text/plain"}
+        import httplib
+        if scheme == "http":
+            conn = httplib.HTTPConnection(host)
+        elif scheme == "https":
+            conn = httplib.HTTPSConnection(host)
+        else:
+            assert False # not scheme in ["http", "https"]
+        conn.request("POST", path, body, headers)
+        response = conn.getresponse()
+        data = response.read()
+        conn.close()
+        return data
+
+
+
+def makedirs(name, mode=0777):
+    """Like os.makedirs(), but multi-{thread,process} safe."""
+    import os.path as path
+    from os import curdir, mkdir
+    head, tail = path.split(name)
+    if not tail:
+        head, tail = path.split(head)
+    if head and tail and not path.exists(head):
+        makedirs(head, mode)
+        if tail == curdir:           # xxx/newdir/. exists if xxx/newdir exists
+            return
+    try:
+        mkdir(name, mode)
+    except OSError, error:
+        # 17 == "File exists"
+        if hasattr(error, 'errno') and error.errno == 17:
+            pass
+        else:
+            raise
+    return
+
+
+
+if __name__ == "__main__":
+    script = Specfem3DGlobe()
+    script.run()
+
+
+# end of file

Copied: cs/portal/trunk/northridge/backend/web-portal-daemon.cfg (from rev 12359, cs/portal/trunk/web-portal-daemon.cfg)
===================================================================
--- cs/portal/trunk/northridge/backend/web-portal-daemon.cfg	                        (rev 0)
+++ cs/portal/trunk/northridge/backend/web-portal-daemon.cfg	2008-07-02 00:46:55 UTC (rev 12366)
@@ -0,0 +1,21 @@
+
+[web-portal-daemon]
+sleep-interval = 5*second
+output-root-pathname = /home/portal/public_html/output/
+output-root-url = http://crust.geodynamics.org/~portal/output/
+mineos-pathname = /home/portal/opt/mineos/bin/mineos.py
+
+remote = True
+specfem-pathname = /projects/tg/CIG/SPECFEM3D_GLOBE-4.0.1/specfem3D.py
+remote-shell-command = gsissh tg-login.tacc.teragrid.org
+remote-output-root = /work/teragrid/tgXXXXXX/output
+remote-download-command = gsiscp %(source)s tg-login.tacc.teragrid.org:%(dest)s
+remote-upload-command = gsiscp tg-login.tacc.teragrid.org:%(source)s %(dest)s
+
+journal.device = file
+
+[web-portal-daemon.portal]
+host = crust.geodynamics.org
+scheme = https
+
+

Deleted: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg	2008-07-01 23:52:59 UTC (rev 12365)
+++ cs/portal/trunk/web-portal-daemon.cfg	2008-07-02 00:46:55 UTC (rev 12366)
@@ -1,12 +0,0 @@
-
-[web-portal-daemon]
-sleep-interval = 5*second
-download-input-files = True
-output-root-pathname = /home/leif/public_html/portal/
-output-root-url = http://crust.geodynamics.org/~leif/portal/
-
-[web-portal-daemon.portal]
-#host = crust.geodynamics.org
-host = localhost:8000
-
-



More information about the cig-commits mailing list