[cig-commits] r7914 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Wed Aug 29 19:50:16 PDT 2007


Author: leif
Date: 2007-08-29 19:50:15 -0700 (Wed, 29 Aug 2007)
New Revision: 7914

Modified:
   cs/portal/trunk/daemon.py
   cs/portal/trunk/web-portal-daemon.cfg
Log:
Enhancements corresponding to r1742 & r1749 in the other repository:

Introduced 'Run', 'Job', and 'OutputFile'.  'Run' status takes the
place of 'Simulation' status; there can be muliple runs per
simulation.  'Job' provides web users with detailed job info; there
can be multiple jobs per run: 'build', 'run', and -- maybe someday --
'postprocessing'.  With the addition of the generic 'OutputFile'
mechanism, 'stdout' and 'stderr' are now accessible via the web portal
(plus whatever else the daemon decides to produce in addition to the
seismogram tarball -- perhaps output_{mesher,solver}.txt).

The daemon now creates an output directory for each job and puts the
job output files there.  The job directories are created under a root
directory which is presumed to be served by a web server at the URL
indicated in the .cfg file.  The daemon randomly generates the
filenames of job directories, creating a modicum of security/privacy
through obscurity (it is assumed that directory-indexes are not
allowed on the root directory/URL).

Updated the example .cfg file with the settings I've been using for
debugging.


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-08-29 23:50:51 UTC (rev 7913)
+++ cs/portal/trunk/daemon.py	2007-08-30 02:50:15 UTC (rev 7914)
@@ -22,17 +22,21 @@
 class GassServer(object):
 
     # @classmethod
-    def startUp(cls):
+    def startUp(cls, directory):
         argv = ["globus-gass-server", "-r", "-w", "-c"]
+        savedWd = os.getcwd()
+        os.chdir(directory)
         child = Popen4(argv)
+        os.chdir(savedWd)
         child.tochild.close()
         url = child.fromchild.readline().strip()
-        return GassServer(child, url)
+        return GassServer(child, url, directory)
     startUp = classmethod(startUp)
 
-    def __init__(self, child, url):
+    def __init__(self, child, url, directory):
         self.child = child
         self.url = url
+        self.directory = directory
 
     def shutDown(self):
         argv = ["globus-gass-server-shutdown", self.url]
@@ -43,20 +47,22 @@
 
 class GlobusJobManager(object):
 
-    def __init__(self, clock, info, downloadInputFiles=False):
+    def __init__(self, clock, root, info, downloadInputFiles=False):
         self.clock = clock
+        self.root = root
         self.info = info
         self.downloadInputFiles = downloadInputFiles
 
     
     def runJob(self, job):
+        self.createSubdirectoryForJob(job)
         stackless.tasklet(self.jobRunner)(job)
 
     
     def jobRunner(self, job):
+
+        gassServer = GassServer.startUp(job.directory)
         
-        gassServer = GassServer.startUp()
-        
         try:
             resSpec = self.resSpec(job, gassServer)
             id = self.globusrun(resSpec)
@@ -66,9 +72,7 @@
                 self.clock.tick.wait()
                 status = self.getJobStatus(id)
                 if status != oldStatus:
-                    # Filter (e.g.) "UNKNOWN JOB STATE 64".
-                    if not status.startswith("UNKNOWN "):
-                        job.setStatus(status)
+                    job.setStatus(status)
                     oldStatus = status
         
         except Exception, e:
@@ -185,7 +189,8 @@
         file_stage_in = []
         for url, inputFile in job.inputFileURLs:
             if self.downloadInputFiles:
-                self.download(url, inputFile)
+                pathname = os.path.join(job.directory, inputFile)
+                self.download(url, pathname)
                 url = gassServer.url + "/./" + inputFile
             file_stage_in.append([url, RSLUnquoted("$(SCRATCH_DIRECTORY) # " + '"/' + inputFile + '"')])
         resSpec["file_stage_in"] = file_stage_in
@@ -197,8 +202,10 @@
         if file_stage_out:
             resSpec["file_stage_out"] = file_stage_out
         
-        resSpec["stdout"] = gassServer.url + "/./" + resSpec["stdout"]
-        resSpec["stderr"] = gassServer.url + "/./" + resSpec["stderr"]
+        resSpec["stdout"] = gassServer.url + "/./stdout.txt"
+        resSpec["stderr"] = gassServer.url + "/./stderr.txt"
+
+        job.outputFiles.extend(["stdout.txt", "stderr.txt"])
         
         return resSpec
 
@@ -215,6 +222,18 @@
         return
 
 
+    def createSubdirectoryForJob(self, job):
+        while True:
+            dirName = randomName()
+            dirPath = os.path.join(self.root, dirName)
+            if not os.path.exists(dirPath):
+                os.mkdir(dirPath)
+                break
+        job.subdir = dirName
+        job.directory = dirPath
+        return
+
+
 class Event(object):
     
     def __init__(self):
@@ -242,6 +261,7 @@
 class Job(object):
 
     # status codes
+    STATUS_NEW      = "NEW" # pseudo
     STATUS_PENDING  = "PENDING"
     STATUS_ACTIVE   = "ACTIVE"
     STATUS_DONE     = "DONE"
@@ -249,15 +269,19 @@
     statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
     deadCodes = [STATUS_DONE, STATUS_FAILED]
 
-    def __init__(self, **resSpec):
+    def __init__(self, task, **resSpec):
+        self.task = task
         self.resSpec = resSpec
-        self.status = None
+        self.status = self.STATUS_NEW
         self.statusChanged = Event()
         self.inputFileURLs = []
         self.outputFiles = []
+        self.subdir = None
+        self.directory = None
 
     def setStatus(self, status):
-        assert status in self.statusCodes, "unknown status: %s" % status
+        if False: # Violated by, e.g., "UNKNOWN JOB STATE 64"
+            assert status in self.statusCodes, "unknown status: %s" % status
         self.status = status
         self.statusChanged.signal()
 
@@ -265,42 +289,41 @@
         return not self.status in self.deadCodes
 
 
-class Simulation(object):
+class Run(object):
     
     # status codes
-    STATUS_NEW        = "SimStatusNew"
-    STATUS_PREPARING  = "SimStatusPreparing"
-    STATUS_PENDING    = "SimStatusPending"
-    STATUS_RUNNING    = "SimStatusRunning"
-    STATUS_FINISHING  = "SimStatusFinishing"
-    STATUS_DONE       = "SimStatusDone"
-    STATUS_ERROR      = "SimStatusError"
+    STATUS_NEW        = "new"
+    STATUS_PREPARING  = "preparing"
+    STATUS_PENDING    = "pending"
+    STATUS_RUNNING    = "running"
+    #STATUS_FINISHING  = "finishing"
+    STATUS_DONE       = "done"
+    STATUS_ERROR      = "error"
     deadCodes = [STATUS_DONE, STATUS_ERROR]
 
-    def __init__(self, id, nodes, info):
+    def __init__(self, id, simulation, info):
         self.id = id
-        self.nodes = nodes
+        self.simulation = simulation
         self.info = info
-        self.status = None
+
+        self.job = None
+        self.status = self.STATUS_NEW
         self.statusChanged = Event()
 
-        self.parameters = 'parameters.pml'
-        self.events = 'events.txt'
-        self.stations = 'stations.txt'
-        self.inputFiles = [self.parameters, self.events, self.stations]
         self.inputFileURLs = []
         self.outputFiles = ["seismograms.tar.gz"]
 
-    def run(self, jm):
+    def go(self, jm):
         stackless.tasklet(self)(jm)
 
     def __call__(self, jm):
         try:
-            self.setStatus(self.STATUS_PREPARING)
-
             # build
             job = self.newBuildJob()
+            self.job = job
             jm.runJob(job)
+            
+            self.setStatus(self.STATUS_PREPARING)
             while job.isAlive():
                 job.statusChanged.wait()
             
@@ -310,26 +333,21 @@
 
             # run
             job = self.newRunJob()
+            self.job = job
             jm.runJob(job)
-            job.statusChanged.wait()
             
-            if job.status == job.STATUS_PENDING:
-                self.setStatus(self.STATUS_PENDING)
+            self.setStatus(self.STATUS_PENDING)
+            while job.isAlive():
                 job.statusChanged.wait()
-            
-            if job.status == job.STATUS_ACTIVE:
-                self.setStatus(self.STATUS_RUNNING)
-                job.statusChanged.wait()
-            
-            if job.status == job.STATUS_DONE:
-                self.setStatus(self.STATUS_FINISHING)
-            else:
-                assert job.status == job.STATUS_FAILED, "%s != %s" % (job.status, job.STATUS_FAILED)
+                if job.status == job.STATUS_ACTIVE:
+                    self.setStatus(self.STATUS_RUNNING)
+            # while
+
+            if job.status == job.STATUS_FAILED:
                 self.setStatus(self.STATUS_ERROR)
                 raise RuntimeError("run failed")
 
-            # transfer files
-
+            self.job = None
             self.setStatus(self.STATUS_DONE)
 
         except Exception, e:
@@ -348,12 +366,11 @@
     def newBuildJob(self):
         self.buildDir = "/work/teragrid/tg456271/portal/%d" % self.id
         job = Job(
+            "build",
             jobType = "single",
             count = 1,
             executable = "/work/teragrid/tg456271/sf3dgp/xspecfem3D",
-            arguments = [self.parameters, "--scheduler.dry", "--output-dir=" + self.buildDir],
-            stdout = "build-stdout",
-            stderr = "build-stderr",
+            arguments = [self.simulation.parameters, "--scheduler.dry", "--output-dir=" + self.buildDir],
             )
         job.inputFileURLs = self.inputFileURLs
         return job
@@ -373,16 +390,17 @@
         #simDir = "/work/teragrid/tg456271/portal/%d" % self.id
         
         pyreArgs = [
-            self.parameters,
+            self.simulation.parameters,
             '--output-dir=' + simDir,
-            '--solver.cmt-solution=' + self.events,
-            '--solver.stations=' + self.stations,
+            '--solver.cmt-solution=' + self.simulation.events,
+            '--solver.stations=' + self.simulation.stations,
             ]
         
         job = Job(
+            "run",
             jobType = "mpi",
-            count = self.nodes,
-            max_time = 60,
+            count = self.simulation.nodes,
+            maxTime = 12*60,
             queue = "normal",
             executable = self.buildDir + "/mpipyspecfem3D",
             arguments = ["--pyre-start",
@@ -390,13 +408,11 @@
                          "Specfem3DGlobe==4.0",
                          "mpi:mpistart",
                          "Specfem3DGlobe.Specfem:Specfem",
-                         "--nodes=%d" % self.nodes,
-                         "--macros.nodes=%d" % self.nodes,
+                         "--nodes=%d" % self.simulation.nodes,
+                         "--macros.nodes=%d" % self.simulation.nodes,
                          "--macros.job.name=mysim",
-                         "--macros.job.id=123456",
+                         "--macros.job.id=%d" % self.id,
                          ] + pyreArgs,
-            stdout = "run-stdout",
-            stderr = "run-stderr",
             )
 
         job.inputFileURLs = self.inputFileURLs
@@ -405,81 +421,145 @@
         return job
 
 
+class Simulation(object):
+    def __init__(self, id, nodes):
+        self.id = id
+        self.nodes = nodes
+        self.parameters = 'parameters.pml'
+        self.events = 'events.txt'
+        self.stations = 'stations.txt'
+        self.inputFiles = [self.parameters, self.events, self.stations]
+
+
 class PortalConnection(object):
 
     MULTIPART_BOUNDARY = '----------eArThQuAkE$'
 
-    def __init__(self, portal, clock, info):
+    def __init__(self, portal, outputRootUrl, clock, info):
         self.portal = portal
+        self.outputRootUrl = outputRootUrl
         self.clock = clock
         self.info = info
 
     def runSimulations(self, jm):
-        stackless.tasklet(self.simulationFactory)(jm)
+        stackless.tasklet(self.runFactory)(jm)
 
-    def simulationFactory(self, jm):
+    def runFactory(self, jm):
         import urllib2
         
-        simulations = {}
+        runs = {}
         
         while True:
             self.clock.tick.wait()
             
-            self.info.log("GET %s" % self.portal.simulationsUrl)
-            infile = urllib2.urlopen(self.portal.simulationsUrl)
+            self.info.log("GET %s" % self.portal.runsUrl)
+            infile = urllib2.urlopen(self.portal.runsUrl)
             self.info.log("OK")
-            simulationList = infile.read()
+            runList = infile.read()
             infile.close()
             
-            simulationList = eval(simulationList)
+            runList = eval(runList)
             
-            for sim in simulationList:
-                status = sim['status']
-                id = int(sim['id'])
-                nodes = int(sim['nodes'])
-                if (status == Simulation.STATUS_NEW and
-                    not simulations.has_key(id)):
-                    self.info.log("new simulation %d" % id)
-                    newSimulation = Simulation(id, nodes, self.info)
-                    for inputFile in newSimulation.inputFiles:
-                        url = self.inputFileURL(newSimulation, inputFile)
-                        newSimulation.inputFileURLs.append((url, inputFile))
-                    self.watchSimulation(newSimulation)
-                    simulations[id] = newSimulation
-                    newSimulation.run(jm)
+            for run in runList:
+                id = int(run['id'])
+                status = run['status']
+                simId = run['simulation']
+                nodes = int(run['nodes'])
+                if (status in [Run.STATUS_NEW, ""] and
+                    not runs.has_key(id)):
+                    self.info.log("new run %d" % id)
+                    simulation = Simulation(simId, nodes)
+                    newRun = Run(id, simulation, self.info)
+                    for inputFile in simulation.inputFiles:
+                        url = self.inputFileURL(simulation, inputFile)
+                        newRun.inputFileURLs.append((url, inputFile))
+                    self.watchRun(newRun)
+                    runs[id] = newRun
+                    newRun.go(jm)
         
         return
 
-    def watchSimulation(self, simulation):
-        stackless.tasklet(self.simulationWatcher)(simulation)
+    def watchRun(self, run):
+        stackless.tasklet(self.runWatcher)(run)
         
-    def simulationWatcher(self, simulation):
-        while simulation.isAlive():
-            simulation.statusChanged.wait()
-            self.postStatusChange(simulation)
+    def runWatcher(self, run):
+        url = self.portal.runStatusUrl % run.id
+        job = run.job
+        while run.isAlive():
+            run.statusChanged.wait()
+            fields = {'status': run.status}
+            self.postStatusChange(url, fields)
+            if run.job != job:
+                job = run.job
+                if job is not None:
+                    self.watchJob(job, run)
         return
 
+    def watchJob(self, job, run):
+        url = self.portal.jobCreateUrl
+        fields = self.jobFields(job, run)
+        response = self.postStatusChange(url, fields)
+        portalId = eval(response)
+        stackless.tasklet(self.jobWatcher)(job, portalId, run)
+        
+    def jobWatcher(self, job, portalId, run):
+        url = self.portal.jobUpdateUrl % portalId
+        while job.isAlive():
+            job.statusChanged.wait()
+            fields = self.jobFields(job, run)
+            self.postStatusChange(url, fields)
+        self.postJobOutput(job, portalId)
+        return
+
+    def postJobOutput(self, job, portalId):
+        url = self.portal.outputCreateUrl
+        
+        for outputFile in job.outputFiles:
+            
+            pathname = os.path.join(job.directory, outputFile)
+            if not os.path.exists(pathname):
+                continue
+            
+            # in case the daemon and the web server are run as
+            # different users
+            import stat
+            os.chmod(pathname, stat.S_IRGRP | stat.S_IROTH)
+            
+            fields = {
+                'job': portalId,
+                'name': outputFile,
+                }
+            self.postStatusChange(url, fields)
+        
+        return
+
+    def jobFields(self, job, run):
+        fields = {
+            'run': run.id,
+            'task': job.task,
+            'status': job.status.lower(),
+            'url': self.outputRootUrl + job.subdir + "/",
+            }
+        return fields
+
     def inputFileURL(self, simulation, inputFile):
         return self.portal.inputFileUrl % (simulation.id, inputFile)
 
-    def postStatusChange(self, simulation):
+    def postStatusChange(self, url, fields):
         import urllib
-        body = urllib.urlencode({'status': simulation.status})
+        body = urllib.urlencode(fields)
         headers = {"Content-Type": "application/x-www-form-urlencoded",
                    "Accept": "text/plain"}
-        self.post(body, headers, simulation)
-        return
+        return self.post(body, headers, url)
 
-    def postStatusChangeAndUploadOutput(self, simulation, output):
+    def postStatusChangeAndUploadOutput(self, url, fields, output):
         # Based upon a recipe by Wade Leftwich.
-        fields = {'status': simulation.status}
         files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
         body = self.encodeMultipartFormData(fields, files)
         headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
                    "Content-Length": str(len(body)),
                    "Accept": "text/plain"}
-        self.post(body, headers, simulation)
-        return
+        return self.post(body, headers, url)
 
     def encodeMultipartFormData(self, fields, files):
         import shutil
@@ -503,16 +583,16 @@
         line()
         return stream.getvalue()
 
-    def post(self, body, headers, simulation):
-        self.info.log("POST %d" % simulation.id)
+    def post(self, body, headers, url):
+        self.info.log("POST %s" % url)
         import httplib
         conn = httplib.HTTPConnection(self.portal.host)
-        conn.request("POST", (self.portal.simStatusUrl % simulation.id), body, headers)
+        conn.request("POST", url, body, headers)
         response = conn.getresponse()
         data = response.read()
         conn.close()
         self.info.log("response %s" % data)
-        return
+        return data
 
 
 class Clock(object):
@@ -541,9 +621,15 @@
 
     def _configure(self):
         self.urlPrefix           = 'http://%s%s' % (self.host, self.urlRoot)
-        self.simulationsUrl      = self.urlPrefix + 'simulations/list.py'
         self.inputFileUrl        = self.urlPrefix + 'simulations/%d/%s'
-        self.simStatusUrl        = self.urlRoot + 'simulations/%d/status/'
+        # runs
+        self.runsUrl             = self.urlPrefix + 'runs/list.py'
+        self.runStatusUrl        = self.urlRoot + 'runs/%d/status/'
+        # jobs
+        self.jobCreateUrl        = self.urlRoot + 'jobs/create/'
+        self.jobUpdateUrl        = self.urlRoot + 'jobs/%d/update/'
+        # output
+        self.outputCreateUrl     = self.urlRoot + 'output/create/'
 
 
 class Daemon(Script):
@@ -562,13 +648,16 @@
 stage-in input files directly from a publicly-accessible portal via
 HTTP.""")
 
+    outputRootPathname = pyre.str("output-root-pathname")
+    outputRootUrl = pyre.str("output-root-url")
 
+
     def main(self, *args, **kwds):
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
         clock = Clock(self.sleepInterval / second)
-        jm = GlobusJobManager(clock, self._info, self.downloadInputFiles)
-        connection = PortalConnection(self.portal, clock, self._info)
+        jm = GlobusJobManager(clock, self.outputRootPathname, self._info, self.downloadInputFiles)
+        connection = PortalConnection(self.portal, self.outputRootUrl, clock, self._info)
         connection.runSimulations(jm)
         try:
             stackless.run()
@@ -577,6 +666,19 @@
         return
 
 
+def randomName():
+    # Stolen from pyre.services.Pickler; perhaps this should be an
+    # official/"exported" Pyre function?
+    
+    alphabet = list("0123456789abcdefghijklmnopqrstuvwxyz")
+
+    import random
+    random.shuffle(alphabet)
+    key = "".join(alphabet)[0:16]
+
+    return key
+        
+
 def main(*args, **kwds):
     daemon = Daemon()
     daemon.run(*args, **kwds)

Modified: cs/portal/trunk/web-portal-daemon.cfg
===================================================================
--- cs/portal/trunk/web-portal-daemon.cfg	2007-08-29 23:50:51 UTC (rev 7913)
+++ cs/portal/trunk/web-portal-daemon.cfg	2007-08-30 02:50:15 UTC (rev 7914)
@@ -1,7 +1,12 @@
 
 [web-portal-daemon]
 sleep-interval = 5*second
+download-input-files = True
+output-root-pathname = /home/leif/public_html/portal/
+output-root-url = http://crust.geodynamics.org/~leif/portal/
 
 [web-portal-daemon.portal]
-host = crust.geodynamics.org
+#host = crust.geodynamics.org
+host = localhost:8000
 
+



More information about the cig-commits mailing list