[cig-commits] r15624 - in cs/pythia/trunk: journal/devices pyre/applications pyre/services

leif at geodynamics.org leif at geodynamics.org
Fri Aug 28 20:07:43 PDT 2009


Author: leif
Date: 2009-08-28 20:07:43 -0700 (Fri, 28 Aug 2009)
New Revision: 15624

Modified:
   cs/pythia/trunk/journal/devices/TCPDevice.py
   cs/pythia/trunk/pyre/applications/Daemon.py
   cs/pythia/trunk/pyre/applications/Shell.py
   cs/pythia/trunk/pyre/services/TCPService.py
   cs/pythia/trunk/pyre/services/TCPSession.py
Log:
Keep pyre.services connections alive, handling multiple requests over
the same client connection.  Bomb with an exception (instead of
failing silently) when a remote journal connection fails.  In
pyre.applications.Daemon, don't close the standard I/O descriptors in
debug mode, so that exception stack traces go to the terminal.


Modified: cs/pythia/trunk/journal/devices/TCPDevice.py
===================================================================
--- cs/pythia/trunk/journal/devices/TCPDevice.py	2009-08-28 23:32:34 UTC (rev 15623)
+++ cs/pythia/trunk/journal/devices/TCPDevice.py	2009-08-29 03:07:43 UTC (rev 15624)
@@ -19,22 +19,16 @@
 
 
     def record(self, entry):
-        import pyre.ipc
-        connection = pyre.ipc.connection('tcp')
 
-        # attempt to connect
-        # if refused, just drop the entry for now
-        try:
-            connection.connect((self.host, self.port))
-        except connection.ConnectionError:
+        if self._connection is None:
             return
-
+        
         import journal
         request = journal.request(command="record", args=[self.renderer.render(entry)])
 
         try:
-            self._marshaller.send(request, connection)
-            result = self._marshaller.receive(connection)
+            self._marshaller.send(request, self._connection)
+            result = self._marshaller.receive(self._connection)
         except self._marshaller.RequestError:
             return
 
@@ -54,6 +48,11 @@
         self._marshaller = journal.pickler()
         self._marshaller.key = key
 
+        import pyre.ipc
+        self._connection = pyre.ipc.connection('tcp')
+
+        self._connection.connect((self.host, self.port))
+
         return
 
 

Modified: cs/pythia/trunk/pyre/applications/Daemon.py
===================================================================
--- cs/pythia/trunk/pyre/applications/Daemon.py	2009-08-28 23:32:34 UTC (rev 15623)
+++ cs/pythia/trunk/pyre/applications/Daemon.py	2009-08-29 03:07:43 UTC (rev 15624)
@@ -30,7 +30,7 @@
         if not spawn:
             print " ** daemon %r in debug mode" % self.name
             import os
-            self.daemon(os.getpid())
+            self.daemon(os.getpid(), spawn=False)
             return
             
         import pyre.util
@@ -63,7 +63,7 @@
         return
         
 
-    def daemon(self, pid):
+    def daemon(self, pid, spawn=True):
         import os
 
         # change the working directory to my home directory
@@ -80,11 +80,12 @@
         # build a journal configuration file
         # self.configureJournal()
 
-        # close all ties with the parent process
-        os.close(2)
-        os.close(1)
-        os.close(0)
-
+        if spawn:
+            # close all ties with the parent process
+            os.close(2)
+            os.close(1)
+            os.close(0)
+        
         # launch the application
         self.main(*self.args, **self.kwds)
 

Modified: cs/pythia/trunk/pyre/applications/Shell.py
===================================================================
--- cs/pythia/trunk/pyre/applications/Shell.py	2009-08-28 23:32:34 UTC (rev 15623)
+++ cs/pythia/trunk/pyre/applications/Shell.py	2009-08-29 03:07:43 UTC (rev 15624)
@@ -15,6 +15,7 @@
 
 
 try:
+    # XXX: This default is annoying when one is debugging CGI apps.
     import IPython.ultraTB
     defaultExceptHook = "ultraTB"
 except ImportError:

Modified: cs/pythia/trunk/pyre/services/TCPService.py
===================================================================
--- cs/pythia/trunk/pyre/services/TCPService.py	2009-08-28 23:32:34 UTC (rev 15623)
+++ cs/pythia/trunk/pyre/services/TCPService.py	2009-08-29 03:07:43 UTC (rev 15624)
@@ -24,14 +24,26 @@
         if not self.validateConnection(address):
             return True
 
+        self._info.log("new connection from [%d@%s]" % (address[1], address[0]))
+
+        # Create a closure to remember 'address'.
+        def handler(selector, socket):
+            return self.onRequest(selector, socket, address)
+        
+        selector.notifyOnReadReady(socket, handler)
+
+        return True
+
+
+    def onRequest(self, selector, socket, address):
+
         try:
             request = self.marshaller.receive(socket)
         except ValueError, msg:
-            self._debug.log("bad request: %s" % msg)
-            return True
+            return self.badRequest(socket, address, msg)
         except self.marshaller.RequestError, msg:
-            self._info.log(msg)
-            return True
+            self.requestError(socket, address, msg)
+            return False
 
         self._info.log("request from [%d@%s]: command=%r, args=%r" % (
             address[1], address[0], request.command, request.args))
@@ -43,11 +55,36 @@
         try:
             self.marshaller.send(result, socket)
         except self.marshaller.RequestError, msg:
-            self._debug.log(msg)
+            self.requestError(socket, address, msg)
+            return False
 
         return True
 
 
+    def badRequest(self, socket, address, msg):
+        """Notify the receiver that a client sent a bad request."""
+        
+        # Subclasses can override this to send an error to the client,
+        # if their protocol supports it.  The default action is to
+        # close the connection because clients otherwise hang while
+        # waiting for the result on the socket.
+        
+        self.badConnection(socket, address, "bad request: %s" % msg)
+        return False
+
+
+    def requestError(self, socket, address, msg):
+        self.badConnection(socket, address, "error: %s" % msg)
+        return
+
+
+    def badConnection(self, socket, address, msg):
+        self._info.log("closing connection from [%d@%s]: %s" % (
+            address[1], address[0], msg))
+        socket.close()
+        return
+
+
     def __init__(self, name=None):
         Service.__init__(self, name)
         return

Modified: cs/pythia/trunk/pyre/services/TCPSession.py
===================================================================
--- cs/pythia/trunk/pyre/services/TCPSession.py	2009-08-28 23:32:34 UTC (rev 15623)
+++ cs/pythia/trunk/pyre/services/TCPSession.py	2009-08-29 03:07:43 UTC (rev 15624)
@@ -28,11 +28,6 @@
         import pyre.services
         request = pyre.services.request(command, args)
 
-        try:
-            self._connect()
-        except self._connection.ConnectionError, error:
-            raise self.RequestError(str(error))
-
         self._info.log("sending request: command=%r" % command)
         self.marshaller.send(request, self._connection)
         self._info.log("request sent")
@@ -46,6 +41,15 @@
         return
 
 
+    def _init(self):
+        try:
+            self._connect()
+        except self._connection.ConnectionError, error:
+            raise self.RequestError(str(error))
+
+        return
+
+
 # version
 __id__ = "$Id: TCPSession.py,v 1.2 2005/03/14 22:55:35 aivazis Exp $"
 



More information about the CIG-COMMITS mailing list