[cig-commits] r7768 - cs/portal/trunk

leif at geodynamics.org leif at geodynamics.org
Wed Aug 1 14:04:32 PDT 2007


Author: leif
Date: 2007-08-01 14:04:31 -0700 (Wed, 01 Aug 2007)
New Revision: 7768

Modified:
   cs/portal/trunk/daemon.py
Log:
Made steps towards actually running SPECFEM simulations via Globus.
Currently, simulations fail because staging is still not implemented.
Also, Pyre integration is needed (for things like node count, argument
list, etc.).


Modified: cs/portal/trunk/daemon.py
===================================================================
--- cs/portal/trunk/daemon.py	2007-08-01 03:51:42 UTC (rev 7767)
+++ cs/portal/trunk/daemon.py	2007-08-01 21:04:31 UTC (rev 7768)
@@ -8,52 +8,116 @@
 from pyre.units.time import second
 
 
-MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+class GlobusJobManager(object):
 
+    def __init__(self, clock, info):
+        self.clock = clock
+        self.info = info
 
-def spawn(info, *argv):
-    command = ' '.join(argv)
-    info.log("spawning: %s" % command)
-    status = os.spawnvp(os.P_WAIT, argv[0], argv)
-    statusMsg = "%s: exit %d" % (argv[0], status)
-    info.log(statusMsg)
-    if status != 0:
-        raise Exception(statusMsg)
-    return
+    
+    def runJob(self, job):
+        stackless.tasklet(self.jobRunner)(job)
 
+    
+    def jobRunner(self, job):
+        try:
+            # (file_stage_in = ($(GLOBUSRUN_GASS_URL)/./tomato $(SCRATCH_DIRECTORY)/input))
+            id = self.globusrun(job.resSpec)
+            oldStatus = job.status
+            while job.isAlive():
+                self.clock.tick.wait()
+                status = self.globus_job_status(id)
+                if status != oldStatus:
+                    job.setStatus(status)
+                    oldStatus = status
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            job.setStatus(job.STATUS_ERROR)
+        return
 
-def ospawn(info, *argv):
-    command = ' '.join(argv)
-    info.log("spawning: %s" % command)
 
-    child = Popen4(argv)
+    def spawn(self, *argv):
+        command = ' '.join(argv)
+        self.info.log("spawning: %s" % command)
+        status = os.spawnvp(os.P_WAIT, argv[0], argv)
+        statusMsg = "%s: exit %d" % (argv[0], status)
+        self.info.log(statusMsg)
+        if status != 0:
+            raise Exception(statusMsg)
+        return
 
-    child.tochild.close()
 
-    output = child.fromchild.readlines()
-    status = child.wait()
+    def ospawn(self, *argv):
+        command = ' '.join(argv)
+        self.info.log("spawning: %s" % command)
 
-    exitStatus = None
-    if (os.WIFSIGNALED(status)):
-        statusStr = "signal %d" % os.WTERMSIG(status)
-    elif (os.WIFEXITED(status)):
-        exitStatus = os.WEXITSTATUS(status)
-        statusStr = "exit %d" % exitStatus
-    else:
-        statusStr = "status %d" % status
-    statusMsg = "%s: %s" % (argv[0], statusStr)
+        child = Popen4(argv)
+
+        child.tochild.close()
+
+        output = child.fromchild.readlines()
+        status = child.wait()
+
+        exitStatus = None
+        if (os.WIFSIGNALED(status)):
+            statusStr = "signal %d" % os.WTERMSIG(status)
+        elif (os.WIFEXITED(status)):
+            exitStatus = os.WEXITSTATUS(status)
+            statusStr = "exit %d" % exitStatus
+        else:
+            statusStr = "status %d" % status
+        statusMsg = "%s: %s" % (argv[0], statusStr)
     
-    for line in output:
-        info.line("    " + line.rstrip())
-    info.log(statusMsg)
+        for line in output:
+            self.info.line("    " + line.rstrip())
+        self.info.log(statusMsg)
     
-    if exitStatus != 0:
-        raise Exception(statusMsg)
+        if exitStatus != 0:
+            raise Exception(statusMsg)
 
-    return output
+        return output
 
 
+    def globusrun(self, resSpec):
+        import tempfile
+        fd, rslName = tempfile.mkstemp(suffix=".rsl")
+        stream = os.fdopen(fd, "w")
+        try:
+            self.writeRsl(resSpec, stream)
+            stream.close()
+            if resSpec['jobType'] == "mpi":
+                resourceManager = "tg-login.tacc.teragrid.org/jobmanager-lsf"
+            else:
+                resourceManager = "tg-login.tacc.teragrid.org"
+            output = self.ospawn("globusrun", "-F", "-f", rslName, "-r", resourceManager)
+            id = None
+            for line in output:
+                if line.startswith("https://"):
+                    id = line.strip()
+            assert id
+        finally:
+            stream.close()
+            #os.unlink(rslName)
+        return id
 
+
+    def writeRsl(self, resSpec, stream):
+        print >>stream, '&'
+        for relation in resSpec.iteritems():
+            attribute, valueSequence = relation
+            if not isinstance(valueSequence, (tuple, list)):
+                valueSequence = [valueSequence]
+            valueSequence = ['"%s"' % value for value in valueSequence]
+            print >>stream, '(', attribute, '=', ' '.join(valueSequence), ')'
+        return
+
+
+    def globus_job_status(self, id):
+        output = self.ospawn("globus-job-status", id)
+        status = output[0].strip()
+        return status
+
+
 class Event(object):
     
     def __init__(self):
@@ -88,48 +152,24 @@
     statusCodes = [STATUS_PENDING, STATUS_ACTIVE, STATUS_DONE, STATUS_FAILED]
     deadCodes = [STATUS_DONE, STATUS_FAILED]
 
-    #@classmethod
-    def newJob(cls, info):
-        output = ospawn(info, "globusrun", "-F", "-f", "sleep.rsl", "-r", "tg-login.tacc.teragrid.org")
-        id = None
-        for line in output:
-            if line.startswith("https://"):
-                id = line.strip()
-        assert id
-        job = Job(id, info)
-        job.run()
-        return job
-    newJob = classmethod(newJob)
-        
-    def __init__(self, id, info):
-        self.id = id
-        self.info = info
+    def __init__(self, **resSpec):
+        self.resSpec = resSpec
         self.status = None
         self.statusChanged = Event()
+        self.inputFiles = []
 
-    def getStatus(self):
-        output = ospawn(self.info, "globus-job-status", self.id)
-        oldStatus = self.status
-        self.status = output[0].strip()
-        assert self.status in Job.statusCodes
-        if self.status != oldStatus:
-            self.statusChanged.signal()
-        return self.status
+    def setStatus(self, status):
+        assert status in self.statusCodes
+        self.status = status
+        self.statusChanged.signal()
 
     def isAlive(self):
-        if self.status in Job.deadCodes:
-            return False
-        return not self.getStatus() in Job.deadCodes
+        return not self.status in self.deadCodes
 
-    def run(self):
-        stackless.tasklet(self)()
+    def stageIn(self, filename):
+        self.inputFiles.append(filename)
 
-    def __call__(self):
-        while self.isAlive():
-            stackless.schedule()
-        return
 
-
 class Simulation(object):
     
     # status codes
@@ -140,29 +180,204 @@
     STATUS_FINISHING  = "SimStatusFinishing"
     STATUS_DONE       = "SimStatusDone"
     STATUS_ERROR      = "SimStatusError"
+    deadCodes = [STATUS_DONE, STATUS_ERROR]
 
-    def __init__(self, id, daemon, portal):
+    def __init__(self, id, info):
         self.id = id
-        self.daemon = daemon
+        self.info = info
+        self.status = None
+        self.statusChanged = Event()
+
+    def run(self, jm):
+        stackless.tasklet(self)(jm)
+
+    def __call__(self, jm):
+        try:
+            self.setStatus(self.STATUS_PREPARING)
+
+            # build
+            job = self.newBuildJob()
+            jm.runJob(job)
+            while job.isAlive():
+                job.statusChanged.wait()
+            
+            if job.status == job.STATUS_FAILED:
+                self.setStatus(self.STATUS_ERROR)
+                raise RuntimeError("build failed")
+
+            # run
+            job = self.newRunJob()
+            jm.runJob(job)
+            job.statusChanged.wait()
+            
+            if job.status == job.STATUS_PENDING:
+                self.setStatus(self.STATUS_PENDING)
+                job.statusChanged.wait()
+            
+            if job.status == job.STATUS_ACTIVE:
+                self.setStatus(self.STATUS_RUNNING)
+                job.statusChanged.wait()
+            
+            if job.status == job.STATUS_DONE:
+                self.setStatus(self.STATUS_FINISHING)
+            else:
+                assert job.status == job.STATUS_FAILED, "%s != %s" % (job.status, job.STATUS_FAILED)
+                self.setStatus(self.STATUS_ERROR)
+                raise RuntimeError("run failed")
+
+            # transfer files
+
+            self.setStatus(self.STATUS_DONE)
+
+        except Exception, e:
+            self.info.log("error: %s: %s" % (e.__class__.__name__, e))
+            self.setStatus(self.STATUS_ERROR)
+        
+        return
+
+    def setStatus(self, status):
+        self.status = status
+        self.statusChanged.signal()
+
+    def isAlive(self):
+        return not self.status in self.deadCodes
+
+    def newBuildJob(self):
+        return Job(
+            jobType = "single",
+            count = 1,
+            directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
+            executable = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/xspecfem3D",
+            arguments = ["mysim.cfg", "--scheduler.dry"],
+            stdout = "stdout.mysim.test",
+            stderr = "stderr.mysim.test",
+            )
+
+    def newRunJob(self):
+        from os.path import join
+        
+        sf = "/work/teragrid/tg456271/SPECFEM3D_GLOBE"
+        pythonPath = ':'.join([
+            sf + "/merlin-1.3.egg",
+            sf,
+            sf + "/python/pythia-0.8.1.4-py2.4.egg",
+            sf + "/python/Cheetah-2.0rc8-py2.4-linux-x86_64.egg",
+            ])
+        nodes = 24
+
+        parameters = 'parameters.pml'
+        events = 'events.txt'
+        stations = 'stations.txt'
+        inputFiles = [parameters, events, stations]
+
+        simDir = "/work/teragrid/tg456271/simulations/mysim"
+        
+        pyreArgs = [
+            parameters,
+            '--output-dir=' + simDir,
+            '--solver.cmt-solution=' + events,
+            '--solver.stations=' + stations,
+            '--solver.seismogram-archive=' + join(simDir, 'output.tar.gz'),
+            ]
+        
+        job = Job(
+            jobType = "mpi",
+            count = nodes,
+            max_time = 60,
+            queue = "normal",
+            directory = "/work/teragrid/tg456271/SPECFEM3D_GLOBE/work",
+            executable = "/work/teragrid/tg456271/simulations/mysim/mpipyspecfem3D",
+            arguments = ["--pyre-start",
+                         pythonPath,
+                         "Specfem3DGlobe==4.0",
+                         "mpi:mpistart",
+                         "Specfem3DGlobe.Specfem:Specfem",
+                         "mysim.cfg",
+                         "--nodes=%d" % nodes,
+                         "--macros.nodes=%d" % nodes,
+                         "--macros.job.name=mysim",
+                         "--macros.job.id=123456",
+                         ] + pyreArgs,
+            stdout = "stdout.mysim.run",
+            stderr = "stderr.mysim.run",
+            )
+
+        for inputFile in inputFiles:
+            job.stageIn(inputFile)
+
+        return job
+
+
+class PortalConnection(object):
+
+    MULTIPART_BOUNDARY = '----------eArThQuAkE$'
+
+    def __init__(self, portal, clock, info):
         self.portal = portal
+        self.clock = clock
+        self.info = info
 
-    def postStatusChange(self, newStatus):
+    def runSimulations(self, jm):
+        stackless.tasklet(self.simulationFactory)(jm)
+
+    def simulationFactory(self, jm):
+        import urllib2
+        
+        simulations = {}
+        
+        while True:
+            self.clock.tick.wait()
+            
+            self.info.log("GET %s" % self.portal.simulationsUrl)
+            infile = urllib2.urlopen(self.portal.simulationsUrl)
+            self.info.log("OK")
+            simulationList = infile.read()
+            infile.close()
+            
+            simulationList = eval(simulationList)
+            
+            for sim in simulationList:
+                status = sim['status']
+                id = int(sim['id'])
+                if (status == Simulation.STATUS_NEW and
+                    not simulations.has_key(id)):
+                    self.info.log("new simulation %d" % id)
+                    newSimulation = Simulation(id, self.info)
+                    self.watchSimulation(newSimulation)
+                    simulations[id] = newSimulation
+                    newSimulation.run(jm)
+        
+        return
+
+    def watchSimulation(self, simulation):
+        stackless.tasklet(self.simulationWatcher)(simulation)
+        
+    def simulationWatcher(self, simulation):
+        while simulation.isAlive():
+            simulation.statusChanged.wait()
+            self.postStatusChange(simulation)
+        return
+
+    def inputFileURL(self, simulation, inputFile):
+        return self.portal.inputFileUrl % (simulation.id, inputFile)
+
+    def postStatusChange(self, simulation):
         import urllib
-        body = urllib.urlencode({'status': newStatus})
+        body = urllib.urlencode({'status': simulation.status})
         headers = {"Content-Type": "application/x-www-form-urlencoded",
                    "Accept": "text/plain"}
-        self.post(body, headers)
+        self.post(body, headers, simulation)
         return
 
-    def postStatusChangeAndUploadOutput(self, newStatus, output):
+    def postStatusChangeAndUploadOutput(self, simulation, output):
         # Based upon a recipe by Wade Leftwich.
-        fields = {'status': newStatus}
+        fields = {'status': simulation.status}
         files = [('output', 'output.txt', 'application/x-gtar', 'gzip', output)]
         body = self.encodeMultipartFormData(fields, files)
-        headers = {"Content-Type": "multipart/form-data; boundary=%s" % MULTIPART_BOUNDARY,
+        headers = {"Content-Type": "multipart/form-data; boundary=%s" % self.MULTIPART_BOUNDARY,
                    "Content-Length": str(len(body)),
                    "Accept": "text/plain"}
-        self.post(body, headers)
+        self.post(body, headers, simulation)
         return
 
     def encodeMultipartFormData(self, fields, files):
@@ -171,112 +386,50 @@
         stream = StringIO()
         def line(s=''): stream.write(s + '\r\n')
         for key, value in fields.iteritems():
-            line('--' + MULTIPART_BOUNDARY)
+            line('--' + self.MULTIPART_BOUNDARY)
             line('Content-Disposition: form-data; name="%s"' % key)
             line()
             line(value)
         for (key, filename, contentType, contentEncoding, content) in files:
-            line('--' + MULTIPART_BOUNDARY)
+            line('--' + self.MULTIPART_BOUNDARY)
             line('Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename))
             line('Content-Type: %s' % contentType)
             line('Content-Encoding: %s' % contentEncoding)
             line()
             shutil.copyfileobj(content, stream)
             line()
-        line('--' + MULTIPART_BOUNDARY + '--')
+        line('--' + self.MULTIPART_BOUNDARY + '--')
         line()
         return stream.getvalue()
 
-    def post(self, body, headers):
-        self.daemon._info.log("POST %d" % self.id)
+    def post(self, body, headers, simulation):
+        self.info.log("POST %d" % simulation.id)
         import httplib
         conn = httplib.HTTPConnection(self.portal.host)
-        conn.request("POST", (self.portal.simStatusUrl % self.id), body, headers)
+        conn.request("POST", (self.portal.simStatusUrl % simulation.id), body, headers)
         response = conn.getresponse()
         data = response.read()
         conn.close()
-        self.daemon._info.log("response %s" % data)
+        self.info.log("response %s" % data)
         return
 
-    def run(self):
-        stackless.tasklet(self)()
 
-    def __call__(self):
-        try:
-            self.postStatusChange(Simulation.STATUS_PREPARING)
-            job = Job.newJob(self.daemon._info)
-            job.statusChanged.wait()
-            
-            if job.status == Job.STATUS_PENDING:
-                self.postStatusChange(Simulation.STATUS_PENDING)
-                job.statusChanged.wait()
-            
-            if job.status == Job.STATUS_ACTIVE:
-                self.postStatusChange(Simulation.STATUS_RUNNING)
-                job.statusChanged.wait()
-            
-            if job.status == Job.STATUS_DONE:
-                #self.postStatusChange(Simulation.STATUS_FINISHING)
-                self.postStatusChange(Simulation.STATUS_DONE)
-            else:
-                assert job.status == Job.STATUS_FAILED, "%s != %s" % (job.status, Job.STATUS_FAILED)
-                self.postStatusChange(Simulation.STATUS_ERROR)
-                
-        except Exception, e:
-            self.daemon._info.log("error: %s: %s" % (e.__class__.__name__, e))
-            self.postStatusChange(Simulation.STATUS_ERROR)
-        
-        return
+class Clock(object):
 
-
-class Sleeper(object):
-
     def __init__(self, interval):
         self.interval = interval
+        self.tick = Event()
+        stackless.tasklet(self)()
 
     def __call__(self):
         from time import sleep
         while True:
             sleep(self.interval)
+            self.tick.signal()
             stackless.schedule()
         return
 
 
-class SimulationFactory(object):
-
-    def __init__(self, daemon):
-        self.daemon = daemon
-        self.portal = daemon.portal
-
-    def __call__(self):
-        import urllib2
-        
-        simulations = {}
-        
-        while True:
-            stackless.schedule()
-            
-            self.daemon._info.log("GET %s" % self.portal.simulationsUrl)
-            infile = urllib2.urlopen(self.portal.simulationsUrl)
-            self.daemon._info.log("OK")
-            simulationList = infile.read()
-            infile.close()
-            
-            simulationList = eval(simulationList)
-            
-            for sim in simulationList:
-                status = sim['status']
-                id = int(sim['id'])
-                if (status == Simulation.STATUS_NEW and
-                    not simulations.has_key(id)):
-                    self.daemon._info.log("new simulation %d" % id)
-                    newSimulation = Simulation(id, self.daemon, self.portal)
-                    simulations[id] = newSimulation
-                    newSimulation.run()
-        
-        return
-
-
 class WebPortal(Component):
     
     name = "web-portal"
@@ -303,8 +456,10 @@
     def main(self, *args, **kwds):
         self._info.activate()
         self._info.log("~~~~~~~~~~ daemon started ~~~~~~~~~~")
-        stackless.tasklet(Sleeper(self.sleepInterval / second))()
-        stackless.tasklet(SimulationFactory(self))()
+        clock = Clock(self.sleepInterval / second)
+        jm = GlobusJobManager(clock, self._info)
+        connection = PortalConnection(self.portal, clock, self._info)
+        connection.runSimulations(jm)
         try:
             stackless.run()
         except KeyboardInterrupt:



More information about the cig-commits mailing list