Modified:
/trunk/examples/networking/stacklesssocket.py
=======================================
--- /trunk/examples/networking/stacklesssocket.py Thu Nov 11 17:26:11 2010
+++ /trunk/examples/networking/stacklesssocket.py Thu Jan 6 18:59:06 2011
@@ -8,6 +8,11 @@
#
# Remaining work:
#
+# = Test suite that verifies that emulated behaviour is correct.
+# = When closing the socket, pending senders are sent ECONNRESET.
+# This was obtained by opening a server socket, connecting a
+# client and then closing the server. Then the client did a
+# send and got ECONNRESET.
# = Asyncore does not add that much to this module. In fact, its
# limitations and differences between implementations in different Python
# versions just complicate things.
@@ -33,7 +38,7 @@
#
import stackless
-import asyncore, weakref, time, select, types
+import asyncore, weakref, time, select, types, sys, gc
from collections import deque
# If you pump the scheduler and wish to prevent the scheduler from staying
@@ -57,7 +62,8 @@
import socket as stdsocket # We need the "socket" name for the function we
export.
try:
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
- ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, \
+ ECONNREFUSED
except Exception:
# Fallback on hard-coded PS3 constants.
EALREADY = 37
@@ -70,6 +76,7 @@
EISCONN = 56
EBADF = 9
ECONNABORTED = 53
+ ECONNREFUSED = 61
# If we are to masquerade as the socket module, we need to provide the
constants.
if "__all__" in stdsocket.__dict__:
@@ -170,6 +177,17 @@
accept.__doc__ = _socketobject_old.accept.__doc__
+def make_blocking_socket(family=AF_INET, type=SOCK_STREAM, proto=0):
+ """
+ Sometimes you may want to create a normal Python socket, even when
+ monkey-patching is in effect. One use case might be when you are
trying to
+ do socket operations on the last runnable tasklet, if these socket
+ operations are on small writes on a non-connected UDP socket then you
+ might as well just use a blocking socket, as the effect of blocking
+ is negligible.
+ """
+ _sock = _realsocket_old(family, type, proto)
+ return _socketobject_old(_sock=_sock)
def install(pi=None):
@@ -185,6 +203,65 @@
stdsocket._realsocket = _realsocket_old
stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject =
_socketobject_old
+READY_TO_SCHEDULE_TAG = "_SET_ASIDE"
+
+def ready_to_schedule(flag):
+ """
+ There may be cases where it is desirable to have socket operations
happen before
+ an application starts up its framework, which would then poll
asyncore. This
+ function is intended to allow all sockets to be switched between
working
+ "stacklessly" or working directly on their underlying socket objects
in a
+ blocking manner.
+
+ Note that sockets created while this is in effect lack attribute
values that
+ asyncore or this module may have set if the sockets were created in a
full
+ monkey patched manner.
+ """
+
+ def reroute_wrapper(funcName):
+ def reroute_call(self, *args, **kwargs):
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ return getattr(self.socket, funcName)(*args, **kwargs)
+ return reroute_call
+
+ def update_method_referrers(methodName, oldClassMethod,
newClassMethod):
+ """
+ The instance methods we need to update are stored in slots on
instances of
+ socket._socketobject (actually our replacement subclass
_socketobject_new).
+ """
+ for referrer1 in gc.get_referrers(oldClassMethod):
+ if isinstance(referrer1, types.MethodType):
+ for referrer2 in gc.get_referrers(referrer1):
+ if isinstance(referrer2, _socketobject_new):
+ setattr(referrer2, methodName,
types.MethodType(newClassMethod, referrer1.im_self, referrer1.im_class))
+
+ # Guard against removal if not in place.
+ if flag:
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ del _fakesocket.__dict__[READY_TO_SCHEDULE_TAG]
+ else:
+ _fakesocket.__dict__[READY_TO_SCHEDULE_TAG] = None
+ # sys.__stdout__.write("READY_TO_SCHEDULE %s\n" % flag)
+
+ # Play switcheroo with the attributes to get direct socket usage, or
normal socket usage.
+ for attributeName in dir(_realsocket_old):
+ if not attributeName.startswith("_"):
+ storageAttributeName = attributeName +"_SET_ASIDE"
+ if flag:
+ storedValue =
_fakesocket.__dict__.pop(storageAttributeName, None)
+ if storedValue is not None:
+ rerouteValue = _fakesocket.__dict__[attributeName]
+ # sys.__stdout__.write("___ RESTORING %s (AS %s)
(WAS %s)\n" % (attributeName, storedValue, rerouteValue))
+ _fakesocket.__dict__[attributeName] = storedValue
+ update_method_referrers(attributeName, rerouteValue,
storedValue)
+ else:
+ if attributeName in _fakesocket.__dict__:
+ # sys.__stdout__.write("___ STORING %s = %s\n" %
(attributeName, _fakesocket.__dict__[attributeName]))
+ _fakesocket.__dict__[storageAttributeName] =
_fakesocket.__dict__[attributeName]
+ _fakesocket.__dict__[attributeName] =
reroute_wrapper(attributeName)
+
class _fakesocket(asyncore.dispatcher):
connectChannel = None
@@ -434,23 +511,23 @@
self.connected = False
self.accepting = False
- self.writeQueue = []
# Clear out all the channels with relevant errors.
while self.acceptChannel and self.acceptChannel.balance < 0:
- self.acceptChannel.send_exception(error, 9, 'Bad file
descriptor')
+ self.acceptChannel.send_exception(stdsocket.error, EBADF, 'Bad
file descriptor')
while self.connectChannel and self.connectChannel.balance < 0:
- self.connectChannel.send_exception(error, 10061, 'Connection
refused')
- self._clear_read_queue()
-
- def _clear_read_queue(self, *args):
- for t in self.readQueue:
+ self.connectChannel.send_exception(stdsocket.error,
ECONNREFUSED, 'Connection refused')
+ self._clear_queue(self.writeQueue, stdsocket.error, ECONNRESET)
+ self._clear_queue(self.readQueue)
+
+ def _clear_queue(self, queue, *args):
+ for t in queue:
if t[0].balance < 0:
if len(args):
t[0].send_exception(*args)
else:
t[0].send("")
- self.readQueue = []
+ queue.clear()
# asyncore doesn't support this. Why not?
def fileno(self):
@@ -503,6 +580,11 @@
# Asyncore says its done but self.readBuffer may be non-empty
# so can't close yet. Do nothing and let 'recv' trigger the close.
def handle_close(self):
+ # These do not interfere with ongoing reads, but should prevent
+ # sends and the like from going through.
+ self.connected = False
+ self.accepting = False
+
# This also gets called in the case that a non-blocking connect
gets
# back to us with a no. If we don't reject the connect, then all
# connect calls that do not connect will block indefinitely.
@@ -595,10 +677,12 @@
try:
result = self.socket.send(data, flags)
return result
- except socket.error, why:
+ except stdsocket.error, why:
if why.args[0] == EWOULDBLOCK:
return 0
elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN,
ECONNABORTED):
+ # Ensure the sender appears to have directly
received this exception.
+ channel.send_exception(why.__class__, *why.args)
self.handle_close()
return 0
else:
@@ -767,6 +851,10 @@
finally:
uninstall()
+ if "notready" in sys.argv:
+ sys.argv.remove("notready")
+ ready_to_schedule(False)
+
if len(sys.argv) == 2:
if sys.argv[1] == "client":
print "client started"