[stacklessexamples] r172 committed - Various fixes and supporting logic:...

1 view
Skip to first unread message

stackles...@googlecode.com

unread,
Jan 6, 2011, 9:59:53 PM1/6/11
to stackless-exa...@googlegroups.com
Revision: 172
Author: richar...@gmail.com
Date: Thu Jan 6 18:59:06 2011
Log: Various fixes and supporting logic:
- When a socket was closed, the blocked senders would not be notified by
having the appropriate exception raised. Now they should be.
- When a error happens in the wrapped write-related calls, the local
exception catching related logic errored and prevented proper handling of
the error. This was because of a reference to the
non-existent 'socket.error'. Also the caught error should have been raised
into the blocked channel.
- Added a method to create a normal socket (make_blocking_socket). This is
often useful if you want to do socket operations before your application
has started up and is pumping asyncore.
- Added a method to convert all existing blocking sockets over to
non-blocking stackless compatible ones (ready_to_schedule). However, it
remains to be seen if this is even possible. Socket state may be such the
convertion logic cannot hope to convert it.
Still passes the Python 2.7 unit tests as before. This work was done on
CCP time, and is being contributed back because CCP are good like that.
Thanks CCP!
http://code.google.com/p/stacklessexamples/source/detail?r=172

Modified:
/trunk/examples/networking/stacklesssocket.py

=======================================
--- /trunk/examples/networking/stacklesssocket.py Thu Nov 11 17:26:11 2010
+++ /trunk/examples/networking/stacklesssocket.py Thu Jan 6 18:59:06 2011
@@ -8,6 +8,11 @@
#
# Remaining work:
#
+# = Test suite that verifies that emulated behaviour is correct.
+# = When closing the socket, pending senders are sent ECONNRESET.
+# This was obtained by opening a server socket, connecting a
+# client and then closing the server. Then the client did a
+# send and got ECONNRESET.
# = Asyncore does not add that much to this module. In fact, its
# limitations and differences between implementations in different Python
# versions just complicate things.
@@ -33,7 +38,7 @@
#

import stackless
-import asyncore, weakref, time, select, types
+import asyncore, weakref, time, select, types, sys, gc
from collections import deque

# If you pump the scheduler and wish to prevent the scheduler from staying
@@ -57,7 +62,8 @@
import socket as stdsocket # We need the "socket" name for the function we
export.
try:
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
- ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, \
+ ECONNREFUSED
except Exception:
# Fallback on hard-coded PS3 constants.
EALREADY = 37
@@ -70,6 +76,7 @@
EISCONN = 56
EBADF = 9
ECONNABORTED = 53
+ ECONNREFUSED = 61

# If we are to masquerade as the socket module, we need to provide the
constants.
if "__all__" in stdsocket.__dict__:
@@ -170,6 +177,17 @@

accept.__doc__ = _socketobject_old.accept.__doc__

+def make_blocking_socket(family=AF_INET, type=SOCK_STREAM, proto=0):
+ """
+ Sometimes you may want to create a normal Python socket, even when
+ monkey-patching is in effect. One use case might be when you are
trying to
+ do socket operations on the last runnable tasklet, if these socket
+ operations are on small writes on a non-connected UDP socket then you
+ might as well just use a blocking socket, as the effect of blocking
+ is negligible.
+ """
+ _sock = _realsocket_old(family, type, proto)
+ return _socketobject_old(_sock=_sock)


def install(pi=None):
@@ -185,6 +203,65 @@
stdsocket._realsocket = _realsocket_old
stdsocket.socket = stdsocket.SocketType = stdsocket._socketobject =
_socketobject_old

+READY_TO_SCHEDULE_TAG = "_SET_ASIDE"
+
+def ready_to_schedule(flag):
+ """
+ There may be cases where it is desirable to have socket operations
happen before
+ an application starts up its framework, which would then poll
asyncore. This
+ function is intended to allow all sockets to be switched between
working
+ "stacklessly" or working directly on their underlying socket objects
in a
+ blocking manner.
+
+ Note that sockets created while this is in effect lack attribute
values that
+ asyncore or this module may have set if the sockets were created in a
full
+ monkey patched manner.
+ """
+
+ def reroute_wrapper(funcName):
+ def reroute_call(self, *args, **kwargs):
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ return getattr(self.socket, funcName)(*args, **kwargs)
+ return reroute_call
+
+ def update_method_referrers(methodName, oldClassMethod,
newClassMethod):
+ """
+ The instance methods we need to update are stored in slots on
instances of
+ socket._socketobject (actually our replacement subclass
_socketobject_new).
+ """
+ for referrer1 in gc.get_referrers(oldClassMethod):
+ if isinstance(referrer1, types.MethodType):
+ for referrer2 in gc.get_referrers(referrer1):
+ if isinstance(referrer2, _socketobject_new):
+ setattr(referrer2, methodName,
types.MethodType(newClassMethod, referrer1.im_self, referrer1.im_class))
+
+ # Guard against removal if not in place.
+ if flag:
+ if READY_TO_SCHEDULE_TAG not in _fakesocket.__dict__:
+ return
+ del _fakesocket.__dict__[READY_TO_SCHEDULE_TAG]
+ else:
+ _fakesocket.__dict__[READY_TO_SCHEDULE_TAG] = None
+ # sys.__stdout__.write("READY_TO_SCHEDULE %s\n" % flag)
+
+ # Play switcheroo with the attributes to get direct socket usage, or
normal socket usage.
+ for attributeName in dir(_realsocket_old):
+ if not attributeName.startswith("_"):
+ storageAttributeName = attributeName +"_SET_ASIDE"
+ if flag:
+ storedValue =
_fakesocket.__dict__.pop(storageAttributeName, None)
+ if storedValue is not None:
+ rerouteValue = _fakesocket.__dict__[attributeName]
+ # sys.__stdout__.write("___ RESTORING %s (AS %s)
(WAS %s)\n" % (attributeName, storedValue, rerouteValue))
+ _fakesocket.__dict__[attributeName] = storedValue
+ update_method_referrers(attributeName, rerouteValue,
storedValue)
+ else:
+ if attributeName in _fakesocket.__dict__:
+ # sys.__stdout__.write("___ STORING %s = %s\n" %
(attributeName, _fakesocket.__dict__[attributeName]))
+ _fakesocket.__dict__[storageAttributeName] =
_fakesocket.__dict__[attributeName]
+ _fakesocket.__dict__[attributeName] =
reroute_wrapper(attributeName)
+

class _fakesocket(asyncore.dispatcher):
connectChannel = None
@@ -434,23 +511,23 @@

self.connected = False
self.accepting = False
- self.writeQueue = []

# Clear out all the channels with relevant errors.
while self.acceptChannel and self.acceptChannel.balance < 0:
- self.acceptChannel.send_exception(error, 9, 'Bad file
descriptor')
+ self.acceptChannel.send_exception(stdsocket.error, EBADF, 'Bad
file descriptor')
while self.connectChannel and self.connectChannel.balance < 0:
- self.connectChannel.send_exception(error, 10061, 'Connection
refused')
- self._clear_read_queue()
-
- def _clear_read_queue(self, *args):
- for t in self.readQueue:
+ self.connectChannel.send_exception(stdsocket.error,
ECONNREFUSED, 'Connection refused')
+ self._clear_queue(self.writeQueue, stdsocket.error, ECONNRESET)
+ self._clear_queue(self.readQueue)
+
+ def _clear_queue(self, queue, *args):
+ for t in queue:
if t[0].balance < 0:
if len(args):
t[0].send_exception(*args)
else:
t[0].send("")
- self.readQueue = []
+ queue.clear()

# asyncore doesn't support this. Why not?
def fileno(self):
@@ -503,6 +580,11 @@
# Asyncore says its done but self.readBuffer may be non-empty
# so can't close yet. Do nothing and let 'recv' trigger the close.
def handle_close(self):
+ # These do not interfere with ongoing reads, but should prevent
+ # sends and the like from going through.
+ self.connected = False
+ self.accepting = False
+
# This also gets called in the case that a non-blocking connect
gets
# back to us with a no. If we don't reject the connect, then all
# connect calls that do not connect will block indefinitely.
@@ -595,10 +677,12 @@
try:
result = self.socket.send(data, flags)
return result
- except socket.error, why:
+ except stdsocket.error, why:
if why.args[0] == EWOULDBLOCK:
return 0
elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN,
ECONNABORTED):
+ # Ensure the sender appears to have directly
received this exception.
+ channel.send_exception(why.__class__, *why.args)
self.handle_close()
return 0
else:
@@ -767,6 +851,10 @@
finally:
uninstall()

+ if "notready" in sys.argv:
+ sys.argv.remove("notready")
+ ready_to_schedule(False)
+
if len(sys.argv) == 2:
if sys.argv[1] == "client":
print "client started"

Reply all
Reply to author
Forward
0 new messages