Modified:
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py
=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py
Fri Mar 2 18:15:38 2012
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py
Tue Mar 6 18:38:29 2012
@@ -2,25 +2,15 @@
Stackless compatible socket module (pyuv based).
Author: Richard Tew <richar...@gmail.com>
-This is licensed under the MIT open source license.
+See the COPYING file for the open source license.
Feel free to email me with any questions, comments, or suggestions for
improvement.
-Implementation notes:
-- libuv is rather arcane in some of its design decisions and this forces
the
- code in this module to be even more arcane, in order for it to work:
- - loop.run_once does some strange arbitrary calculations to determine
- whether it should block forever waiting for events. There is no way
for
- the caller to pass in a max timeout.
-- 'errno' error codes. There are special Winsock variants which are
expected
- on Windows. In the case of EWOULDBLOCK, this is okay as this value is
the
- correct value (the Winsock value) on Windows. But in the case of EINVAL,
- the value is not that of WSAEINVAL. So we need special casing..
-- os.strerror(). This does not return useful messages on my computer.
- So messages are hard-coded for now.
-
Todo list:
+- TODO: When libuv supports run_once direct timeout, remove timer..
+- TODO: When libuv supports ipv6, remove getaddrinfo hack..
+- TODO: setsockopt/getsockopt SO_REUSEADDRESS is a test passing sham..
- TODO: Exceptions raised out of pyuv are custom ones that present custom
libuv error codes. We have a mapping of some of these to Windows
error codes below, but the exceptions need to be caught and
@@ -28,6 +18,7 @@
"""
import errno
+import random
import socket as stdsocket # We need the "socket" name for the function we
export.
import sys
import weakref
@@ -43,7 +34,7 @@
import stackless
__all__ = stdsocket.__all__
-def adopt_stdsocket_constants():
+def _adopt_stdsocket_constants():
for k, v in stdsocket.__dict__.iteritems():
if k in __all__ and k.upper() == k:
globals()[k] = v
@@ -51,18 +42,22 @@
globals()[k] = v
globals()["error"] = stdsocket.error
globals()["timeout"] = stdsocket.timeout
-adopt_stdsocket_constants()
+_adopt_stdsocket_constants()
_TCP_KEEPALIVE = 3
-def getsockopt_default(level, optname):
+def _getsockopt_default(level, optname):
s = stdsocket.socket()
return s.getsockopt(level, optname)
-DEFAULT_KEEPALIVE_FLAG = getsockopt_default(stdsocket.SOL_SOCKET,
stdsocket.SO_KEEPALIVE)
-DEFAULT_NODELAY_FLAG = getsockopt_default(stdsocket.IPPROTO_TCP,
stdsocket.TCP_NODELAY)
-DEFAULT_KEEPALIVE_DELAY = getsockopt_default(stdsocket.IPPROTO_TCP,
_TCP_KEEPALIVE)
+DEFAULT_NODELAY_FLAG = _getsockopt_default(stdsocket.IPPROTO_TCP,
stdsocket.TCP_NODELAY)
+DEFAULT_KEEPALIVE_FLAG = _getsockopt_default(stdsocket.SOL_SOCKET,
stdsocket.SO_KEEPALIVE)
+DEFAULT_KEEPALIVE_DELAY = _getsockopt_default(stdsocket.IPPROTO_TCP,
_TCP_KEEPALIVE)
+DEFAULT_REUSE_FLAG = _getsockopt_default(stdsocket.SOL_SOCKET,
stdsocket.SO_REUSEADDR)
+
+_fileobject = stdsocket._fileobject
# uv -> windows compatibility: Map UV errnos to Windows errnos.
-def convert_uv_errnos():
+# TODO: Evaluate whethe this is actually useful (do not believe is
currently used).
+def _convert_uv_errnos():
d = {}
for k, v in pyuv.errno.__dict__.iteritems():
if k.startswith("UV_E"):
@@ -73,9 +68,44 @@
# Unix, or special cases. Ignore for now.
pass
return d
-_errno_map = convert_uv_errnos()
+_errno_map = _convert_uv_errnos()
_EWOULDBLOCK_text = "The socket operation could not complete without
blocking"
+
+def socket(*args, **kwargs):
+ import sys
+ if "socket" in sys.modules and sys.modules["socket"] is not stdsocket:
+ raise RuntimeError("Use 'socket_pyuv.install' instead of replacing
the 'socket' module")
+ raise RuntimeError("'socket_pyuv.socket' erroneously called")
+
+# Store the original socket module objects, so we can unclobber what we
+# monkey-patched if we need to.
+_realsocket_old = stdsocket._realsocket
+_getaddrinfo_old = stdsocket.getaddrinfo
+#_socketobject_old = stdsocket._socketobject
+
+# libuv workaround: no ipv6 support. ignore ipv6 addresses until there is.
+def _getaddrinfo(*args):
+ for ret in _getaddrinfo_old(*args):
+ if len(ret[-1]) == 2:
+ yield ret
+
+def install(poll_interval=None):
+ """ Call this to add this module's monkey-patching to the standard
library. """
+ global _poll_interval
+ if stdsocket._realsocket is socket:
+ raise StandardError("Still installed")
+ stdsocket._realsocket = _fakesocket
+ stdsocket.getaddrinfo = _getaddrinfo
+ if poll_interval is not None:
+ _poll_interval = poll_interval
+
+def uninstall():
+ """ Call this to remove this module's monkey-patching of the standard
library. """
+ stdsocket._realsocket = _realsocket_old
+ stdsocket.getaddrinfo = _getaddrinfo_old
+
+
_poll_interval = 0.05
_pumping = False
_pyuv_loop = pyuv.Loop.default_loop()
@@ -117,7 +147,9 @@
return _sleep_func is not None or _timeout_func is not None
-class socket(object):
+_next_fileno = 10101000
+
+class _fakesocket(object):
# Optionally overriden variables.
_accept_channel = None
_blocking = True
@@ -126,10 +158,12 @@
_opt_keepalive = DEFAULT_KEEPALIVE_FLAG
_opt_keepalive_delay = DEFAULT_KEEPALIVE_DELAY
_opt_nodelay = DEFAULT_NODELAY_FLAG
+ _opt_reuseaddr = DEFAULT_REUSE_FLAG
_timeout = None
_was_connected = False
# Official socket object functions.
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0):
+ global _next_fileno
global _pyuv_loop
global _socket_map
# Make underlying pyuv "socket" object.
@@ -144,6 +178,8 @@
self._type = type
self._proto = proto
# Internal support.
+ self._fileno = _next_fileno
+ _next_fileno += 1
if can_timeout():
self._timeout = stdsocket.getdefaulttimeout()
_socket_map[id(self)] = self
@@ -157,7 +193,7 @@
self._accept_channel.preference = 1
try:
# First actually try and do an accept.
- _new_tcp_socket = socket()
+ _new_tcp_socket = _fakesocket()
try:
self._tcp_socket.accept(_new_tcp_socket._tcp_socket)
return accept_result(_new_tcp_socket)
@@ -177,7 +213,15 @@
finally:
self._accept_channel = None
def bind(self, address): # TCP / UDP
- self._socket.bind(address)
+ # TODO: Should do some generic lookup?
+ address = self._resolve_address(address)
+ # pyuv workaround: standard library socket function raises
Overflow, not ValueError.
+ try:
+ self._socket.bind(address)
+ except ValueError, e:
+ if e.message == "port must be between 0 and 65536":
+ raise OverflowError("getsockaddrarg: port must be
0-65535.")
+ raise e
def close(self): # TCP / UDP
"""
Blocks until the close has completed.. correct behaviour?
@@ -189,6 +233,7 @@
self._socket.close(close_callback)
channel.receive()
def connect(self, address): # TCP
+ address = self._resolve_address(address)
err = self.connect_ex(address)
if err:
raise stdsocket.error(err)
@@ -205,22 +250,24 @@
self._connected = True
self._was_connected = True
return err
- def fileno(self): # ?? HOW?
- raise NotImplementedError("socket.fileno")
+ def fileno(self):
+ return self._fileno
def getpeername(self): # TCP
return self._tcp_socket.getpeername()
def getsockname(self): # TCP
- return self._tcp_socket.getsockname()
+ return self._socket.getsockname()
def ioctl(self, control, option): # ?? HOW?
raise NotImplementedError("socket.ioctl")
def listen(self, backlog): # TCP
+ if backlog < 1:
+ raise RuntimeError("Not supported by libuv at this time")
def listen_callback(_listen_tcp_socket, err):
if self._accept_channel and self._accept_channel.balance < 0:
if err is None:
self._accept_channel.send(None)
else:
# TODO: Really should be able to pass multiple
arguments to the exception type..
- self._accept_channel.send_exception(stdsocket.error,
errno_map[err])
+ self._accept_channel.send_exception(stdsocket.error,
_errno_map[err])
self._tcp_socket.listen(listen_callback, backlog)
self._listening = True
def makefile(self, mode, bufsize): # ?? HOW?
@@ -247,7 +294,7 @@
if err is None:
channel.send(data)
else:
- channel.send_exception(stdsocket.error,
errno_map[err])
+ channel.send_exception(stdsocket.error,
_errno_map[err])
self._tcp_socket.start_read(tcp_callback)
else:
raise NotImplementedError("socket.recvfrom/UDP")
@@ -257,9 +304,9 @@
TODO: Deal with the 'bufsize' constraint. Currently data returned
can be larger than this size.
"""
- channel = stackless.channel()
- channel.preference = 1
if self.type == SOCK_DGRAM:
+ channel = stackless.channel()
+ channel.preference = 1
def udp_callback(redundant_handle, address, data, err):
self._udp_socket.stop_recv()
if channel.balance < 0:
@@ -269,11 +316,11 @@
if err is None:
channel.send((data, address))
else:
- channel.send_exception(stdsocket.error,
errno_map[err])
+ channel.send_exception(stdsocket.error,
_errno_map[err])
self._udp_socket.start_recv(udp_callback)
+ return self._receive_with_timeout(channel)
else:
- raise NotImplementedError("socket.recvfrom/TCP")
- return self._receive_with_timeout(channel)
+ return self.recv(bufsize, flags), self.getpeername()
def recvfrom_into(self, buffer, nbytes, flags=0):
raise NotImplementedError("socket.recvfrom_into")
def recv_into(self, buffer, nbytes, flags=0):
@@ -287,14 +334,49 @@
channel.send(None)
else:
# TODO: Really should be able to pass multiple
arguments to the exception type..
- channel.send_exception(stdsocket.error, errno_map[err])
+ channel.send_exception(stdsocket.error,
_errno_map[err])
self._socket.write(string, write_callback)
self._receive_with_timeout(channel)
return len(string)
def sendall(self, string, flags=0):
return self.send(string, flags)
- def sendto(self, string, flags=0, address=None):
- raise NotImplementedError("socket.sendto")
+ def sendto(self, string, *args):
+ if type(string) is unicode:
+ # TODO: Either..
+ # a) Actually do the conversion.
+ # b) Move conversion down into pyuv.
+ raise UnicodeEncodeError("ascii", string, 0, 1, "ouch")
+ elif type(string) is complex:
+ raise TypeError("must be string or buffer, not complex")
+
+ # Handle weird Python stdlib argument ordering.
+ if len(args) == 2:
+ flags, address = args
+ elif len(args) == 1:
+ flags = 0
+ address = args[0]
+ else:
+ raise TypeError("sendto() takes 2 or 3 arguments (%d given)" %
(1 + len(args)))
+
+ if type(flags) is not int:
+ raise TypeError("an integer is required")
+ if address is None:
+ raise TypeError("getsockaddrarg: AF_INET address must be
tuple, not NoneType")
+
+ address = self._resolve_address(address)
+
+ channel = stackless.channel()
+ channel.preference = 1
+ def send_callback(redundant_tcp_handle, err):
+ if channel.balance < 0:
+ if err is None:
+ channel.send(None)
+ else:
+ # TODO: Really should be able to pass multiple
arguments to the exception type..
+ channel.send_exception(stdsocket.error,
_errno_map[err])
+ self._udp_socket.send(address, string, send_callback)
+ self._receive_with_timeout(channel)
+ return len(string)
def setblocking(self, flag):
self._blocking = flag
def settimeout(self, value):
@@ -302,6 +384,8 @@
raise RuntimeError("This is a stackless socket - to have
timeout support you need to provide a sleep function")
self._timeout = value
def gettimeout(self):
+ if self._timeout is None:
+ return stdsocket.getdefaulttimeout()
return self._timeout
def getsockopt(self, level, optname, buflen=None):
if level == stdsocket.IPPROTO_TCP:
@@ -309,9 +393,11 @@
return self._opt_nodelay
elif optname == _TCP_KEEPALIVE:
return self._opt_keepalive_delay
- if level == socket.SOL_SOCKET:
- if optname == socket.SO_KEEPALIVE:
+ if level == SOL_SOCKET:
+ if optname == SO_KEEPALIVE:
return self._opt_keepalive
+ elif optname == SO_REUSEADDR:
+ return self._opt_reuseaddr
def setsockopt(self, level, optname, value):
if level == stdsocket.IPPROTO_TCP:
if optname == stdsocket.TCP_NODELAY:
@@ -324,6 +410,8 @@
if optname == stdsocket.SO_KEEPALIVE:
self._opt_keepalive = bool(value)
self._tcp_socket.keep_alive(self._opt_keepalive,
self._opt_keepalive_delay)
+ elif optname == SO_REUSEADDR:
+ self._opt_reuseaddr = bool(value)
def shutdown(self, how): # TCP
if how != stdsocket.SHUT_WR:
raise RuntimeError("Not supported")
@@ -369,3 +457,19 @@
_sleep_func(self._timeout)
if channel.balance < 0:
channel.send_exception(timeout, "timed out")
+ @classmethod
+ def _resolve_address(klass, address):
+ if address[0] == "":
+ address = ("0.0.0.0", address[1])
+ elif address[0] == "localhost":
+ address = ("127.0.0.1", address[1])
+ return address
+
+def test():
+ #sock = _fakesocket()
+ #reuse = sock.getsockopt(SOL_SOCKET, SO_REUSEADDR)
+ #print reuse
+ s = _fakesocket()
+
+if __name__ == "__main__":
+ test()