[stacklessexamples] r195 committed - Except for two as yet unsupported features, the standard library unit ...

3 views
Skip to first unread message

stackles...@googlecode.com

unread,
Mar 7, 2012, 8:44:29 PM3/7/12
to stackless-exa...@googlegroups.com
Revision: 195
Author: richard.m.tew
Date: Wed Mar 7 17:43:16 2012
Log: Except for two as yet unsupported features, the standard library
unit tests mostly pass (with the monkey-patching in place).

Still to be done:
- Implement a new version of select.
- Implement support for the passing of pre-existing buffers and read
amounts to read/recv calls.
http://code.google.com/p/stacklessexamples/source/detail?r=195

Modified:
/trunk/libraries/stacklesslib_pyuv/stacklesslib/monkeypatch.py
/trunk/libraries/stacklesslib_pyuv/stacklesslib/replacements/socket_pyuv.py
/trunk/libraries/stacklesslib_pyuv/stacklesslib/test/teststdlibunittests.py

=======================================
--- /trunk/libraries/stacklesslib_pyuv/stacklesslib/monkeypatch.py Thu Mar
1 15:31:05 2012
+++ /trunk/libraries/stacklesslib_pyuv/stacklesslib/monkeypatch.py Wed Mar
7 17:43:16 2012
@@ -11,6 +11,10 @@
import stacklessio
except ImportError:
stacklessio = False
+try:
+ import pyuv
+except ImportError:
+ pyuv = None

@@ -20,7 +24,7 @@
patch_thread()
patch_threading()

- patch_select()
+ #patch_select()
patch_socket()

if autonomous:
@@ -66,11 +70,15 @@
sys.modules["_socket"] = _socket
else:
# Fallback on the generic 'stacklesssocket' module.
- from stacklesslib.replacements import socket_asyncore
+ if pyuv:
+ from stacklesslib.replacements import socket_pyuv as socket
+ else:
+ from stacklesslib.replacements import socket_asyncore as socket
+
socket._sleep_func = main.sleep
socket._schedule_func = lambda: main.sleep(0)
# If the user plans to pump themselves, disable auto-pumping.
- if not autononous:
+ if not pyuv and not autononous:
socket._manage_sockets_func = lambda: None
socket.install()

=======================================
---
/trunk/libraries/stacklesslib_pyuv/stacklesslib/replacements/socket_pyuv.py
Tue Mar 6 18:38:29 2012
+++
/trunk/libraries/stacklesslib_pyuv/stacklesslib/replacements/socket_pyuv.py
Wed Mar 7 17:43:16 2012
@@ -127,7 +127,9 @@
while len(_socket_map):
# Ensure the timeout is from the start of our run call.
timer.again()
+ #print >>sys.xstdout, "_pyuv_loop.run_once.call"
_pyuv_loop.run_once()
+ #print >>sys.xstdout, "_pyuv_loop.run_once.called"
_schedule_func()
finally:
_pumping = False
@@ -151,7 +153,6 @@

class _fakesocket(object):
# Optionally overriden variables.
- _accept_channel = None
_blocking = True
_connected = False
_listening = False
@@ -159,8 +160,10 @@
_opt_keepalive_delay = DEFAULT_KEEPALIVE_DELAY
_opt_nodelay = DEFAULT_NODELAY_FLAG
_opt_reuseaddr = DEFAULT_REUSE_FLAG
+ _rchannel = None
_timeout = None
_was_connected = False
+ _wchannel = None
# Official socket object functions.
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0):
global _next_fileno
@@ -189,29 +192,25 @@
_new_tcp_socket._was_connected = True
# Emulate the standard 'socket.accept' return value.
return _new_tcp_socket, _new_tcp_socket.getpeername()
- self._accept_channel = stackless.channel()
- self._accept_channel.preference = 1
+ channel = self._get_rchannel()
+ # First actually try and do an accept.
+ _new_tcp_socket = _fakesocket()
try:
- # First actually try and do an accept.
- _new_tcp_socket = _fakesocket()
- try:
- self._tcp_socket.accept(_new_tcp_socket._tcp_socket)
- return accept_result(_new_tcp_socket)
- except pyuv.error.TCPError:
- # If listen has not been called yet.
- if not self._listening:
- raise stdsocket.error(EINVAL)
- # If the socket is set to non-blocking.
- if not self._blocking:
- raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
- # Otherwise, assume all is well and block on a channel.
- sys.exc_clear()
- # Block until there is an incoming connection.
- self._receive_with_timeout(self._accept_channel)
self._tcp_socket.accept(_new_tcp_socket._tcp_socket)
return accept_result(_new_tcp_socket)
- finally:
- self._accept_channel = None
+ except pyuv.error.TCPError:
+ # If listen has not been called yet.
+ if not self._listening:
+ raise stdsocket.error(EINVAL)
+ # If the socket is set to non-blocking.
+ if not self._blocking or self._timeout == 0.0:
+ raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
+ # Otherwise, assume all is well and block on a channel.
+ sys.exc_clear()
+ # Block until there is an incoming connection.
+ self._receive_with_timeout(channel)
+ self._tcp_socket.accept(_new_tcp_socket._tcp_socket)
+ return accept_result(_new_tcp_socket)
def bind(self, address): # TCP / UDP
# TODO: Should do some generic lookup?
address = self._resolve_address(address)
@@ -236,13 +235,16 @@
address = self._resolve_address(address)
err = self.connect_ex(address)
if err:
- raise stdsocket.error(err)
+ raise stdsocket.error(err, "")
def connect_ex(self, address):
- channel = stackless.channel()
- channel.preference = 1
+ channel = self._get_wchannel()
def connect_callback(_tcp_handle, err):
if channel.balance < 0:
- channel.send(err)
+ if err is not None:
+ err = _errno_map[err]
+ channel.send(err)
+ else:
+ channel.send(err)
self._tcp_socket.connect(address, connect_callback)
err = self._receive_with_timeout(channel)
if err is None:
@@ -261,13 +263,14 @@
def listen(self, backlog): # TCP
if backlog < 1:
raise RuntimeError("Not supported by libuv at this time")
+ channel = self._get_rchannel()
def listen_callback(_listen_tcp_socket, err):
- if self._accept_channel and self._accept_channel.balance < 0:
+ while channel.balance < 0:
if err is None:
- self._accept_channel.send(None)
+ channel.send(None)
else:
# TODO: Really should be able to pass multiple
arguments to the exception type..
- self._accept_channel.send_exception(stdsocket.error,
_errno_map[err])
+ channel.send_exception(stdsocket.error,
_errno_map[err])
self._tcp_socket.listen(listen_callback, backlog)
self._listening = True
def makefile(self, mode, bufsize): # ?? HOW?
@@ -282,8 +285,10 @@
if not self._was_connected:
raise stdsocket.error(ENOTCONN, 'Socket is not connected')

- channel = stackless.channel()
- channel.preference = 1
+ channel = self._get_rchannel()
+ if (not self._blocking or self._timeout == 0.0) and
channel.balance < 1:
+ raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
+
if self.type == SOCK_STREAM:
def tcp_callback(redundant_handle, data, err):
self._tcp_socket.stop_read()
@@ -297,7 +302,14 @@
channel.send_exception(stdsocket.error,
_errno_map[err])
self._tcp_socket.start_read(tcp_callback)
else:
- raise NotImplementedError("socket.recvfrom/UDP")
+ def udp_callback(redundant_handle, address, data, err):
+ self._udp_socket.stop_recv()
+ if channel.balance < 0:
+ if err is None:
+ channel.send(data)
+ else:
+ channel.send_exception(stdsocket.error,
_errno_map[err])
+ self._udp_socket.start_recv(udp_callback)
return self._receive_with_timeout(channel)
def recvfrom(self, bufsize, flags=0): # UDP?
"""
@@ -305,11 +317,15 @@
can be larger than this size.
"""
if self.type == SOCK_DGRAM:
- channel = stackless.channel()
- channel.preference = 1
+ if bufsize < 0:
+ raise ValueError("negative buffersize in recvfrom")
+ channel = self._get_rchannel()
+ if (not self._blocking or self._timeout == 0.0) and
channel.balance < 1:
+ raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
+
def udp_callback(redundant_handle, address, data, err):
self._udp_socket.stop_recv()
- if channel.balance < 0:
+ while channel.balance < 0:
if err == pyuv.errno.UV_EOF:
err = None
data = ""
@@ -321,21 +337,22 @@
return self._receive_with_timeout(channel)
else:
return self.recv(bufsize, flags), self.getpeername()
- def recvfrom_into(self, buffer, nbytes, flags=0):
+ def recvfrom_into(self, buffer, nbytes=0, flags=0):
raise NotImplementedError("socket.recvfrom_into")
- def recv_into(self, buffer, nbytes, flags=0):
+ def recv_into(self, buffer, nbytes=0, flags=0):
raise NotImplementedError("socket.recv_into")
def send(self, string, flags=0): # TCP / UDP
- channel = stackless.channel()
- channel.preference = 1
- def write_callback(redundant_tcp_handle, err):
- if channel.balance < 0:
+ channel = self._get_wchannel()
+ if (not self._blocking or self._timeout == 0.0) and
channel.balance < 1:
+ raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
+ def send_callback(redundant_tcp_handle, err):
+ while channel.balance < 0:
if err is None:
channel.send(None)
else:
# TODO: Really should be able to pass multiple
arguments to the exception type..
channel.send_exception(stdsocket.error,
_errno_map[err])
- self._socket.write(string, write_callback)
+ self._socket.write(string, send_callback)
self._receive_with_timeout(channel)
return len(string)
def sendall(self, string, flags=0):
@@ -365,8 +382,10 @@

address = self._resolve_address(address)

- channel = stackless.channel()
- channel.preference = 1
+ channel = self._get_wchannel()
+ if (not self._blocking or self._timeout == 0.0) and
channel.balance < 1:
+ raise stdsocket.error(EWOULDBLOCK, _EWOULDBLOCK_text)
+
def send_callback(redundant_tcp_handle, err):
if channel.balance < 0:
if err is None:
@@ -413,14 +432,13 @@
elif optname == SO_REUSEADDR:
self._opt_reuseaddr = bool(value)
def shutdown(self, how): # TCP
- if how != stdsocket.SHUT_WR:
- raise RuntimeError("Not supported")
- channel = stackless.channel()
- channel.preference = 1
- def shutdown_callback(tcp_handle):
- channel.send(None)
- self._tcp_socket.shutdown(shutdown_callback)
- channel.receive()
+ if how == stdsocket.SHUT_WR:
+ channel = stackless.channel()
+ channel.preference = 1
+ def shutdown_callback(tcp_handle):
+ channel.send(None)
+ self._tcp_socket.shutdown(shutdown_callback)
+ channel.receive()
# Official socket object read-only properties.
@property
def family(self):
@@ -464,6 +482,16 @@
elif address[0] == "localhost":
address = ("127.0.0.1", address[1])
return address
+ def _get_rchannel(self):
+ if self._rchannel is None:
+ self._rchannel = stackless.channel()
+ self._rchannel.preference = 1
+ return self._rchannel
+ def _get_wchannel(self):
+ if self._wchannel is None:
+ self._wchannel = stackless.channel()
+ self._wchannel.preference = 1
+ return self._wchannel

def test():
#sock = _fakesocket()
=======================================
---
/trunk/libraries/stacklesslib_pyuv/stacklesslib/test/teststdlibunittests.py
Mon Dec 19 00:10:28 2011
+++
/trunk/libraries/stacklesslib_pyuv/stacklesslib/test/teststdlibunittests.py
Wed Mar 7 17:43:16 2012
@@ -9,14 +9,27 @@

from __future__ import absolute_import

+import sys
+#sys.path.append(r"D:\VCS\SVN\stacklessexamples\libraries\stacklesslib_pyuv")
+#sys.xstdout = sys.stdout
+
+try:
+ import pyuv
+except:
+ pyuv = None
+
+asyncore = None
+if not pyuv:
+ import asyncore
+
# Ruin wonderful PEP-8 ordering with pre-emptive monkey-patch.
import stacklesslib.magic
-stacklesslib.magic.monkeypatch()
+stacklesslib.magic.patch_all(autonomous=False)


-import asyncore
import traceback
import sys
+import unittest

import stackless

@@ -28,9 +41,47 @@

def run_unittests():
from test import test_socket
+ test_socket.GeneralModuleTests.test_sock_ioctl = unittest.skip("ioctl
not supportable")(test_socket.GeneralModuleTests.test_sock_ioctl)
+ test_socket.GeneralModuleTests.testListenBacklog0 =
unittest.skip("libuv pending issue
resolution")(test_socket.GeneralModuleTests.testListenBacklog0)
+ test_socket.NonBlockingTCPTests.testAccept =
unittest.skip("select.select not yet
supported")(test_socket.NonBlockingTCPTests.testAccept)
+ test_socket.TCPCloserTest.testClose = unittest.skip("select.select not
yet supported")(test_socket.TCPCloserTest.testClose)
+ test_socket.FileObjectClassTestCase.testFullRead =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testFullRead)
+ test_socket.FileObjectClassTestCase.testReadline =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testReadline)
+ test_socket.FileObjectClassTestCase.testReadlineAfterRead =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testReadlineAfterRead)
+ test_socket.FileObjectClassTestCase.testReadlineAfterReadNoNewline =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testReadlineAfterReadNoNewline)
+ test_socket.FileObjectClassTestCase.testSmallRead =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testSmallRead)
+ test_socket.FileObjectClassTestCase.testUnbufferedRead =
unittest.skip("write+memoryview not yet
supported")(test_socket.FileObjectClassTestCase.testUnbufferedRead)
+ test_socket.UnbufferedFileObjectClassTestCase.testUnbufferedReadline =
unittest.skip("write+memoryview not yet
supported")(test_socket.UnbufferedFileObjectClassTestCase.testUnbufferedReadline)
+ test_socket.NonBlockingTCPTests.testRecv =
unittest.skip("select.select not yet
supported")(test_socket.NonBlockingTCPTests.testRecv)
+ test_socket.BufferIOTest.testRecvFromIntoArray =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvFromIntoArray)
+ test_socket.BufferIOTest.testRecvFromIntoBytearray =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvFromIntoBytearray)
+ test_socket.BufferIOTest.testRecvFromIntoMemoryview =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvFromIntoMemoryview)
+ test_socket.BufferIOTest.testRecvIntoArray =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvIntoArray)
+ test_socket.BufferIOTest.testRecvIntoBytearray =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvIntoBytearray)
+ test_socket.BufferIOTest.testRecvIntoMemoryview =
unittest.skip("recvfrom_into not yet
supported")(test_socket.BufferIOTest.testRecvIntoMemoryview)
from test import test_urllib
from test import test_urllib2
from test import test_xmlrpc
+ test_xmlrpc.SimpleServerTestCase.test_dotted_attribute =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_dotted_attribute)
+ test_xmlrpc.SimpleServerTestCase.test_introspection1 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_introspection1)
+ test_xmlrpc.SimpleServerTestCase.test_introspection2 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_introspection2)
+ test_xmlrpc.SimpleServerTestCase.test_introspection3 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_introspection3)
+ test_xmlrpc.SimpleServerTestCase.test_introspection4 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_introspection4)
+ test_xmlrpc.SimpleServerTestCase.test_multicall =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_multicall)
+ test_xmlrpc.SimpleServerTestCase.test_non_existing_multicall =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_non_existing_multicall)
+ test_xmlrpc.SimpleServerTestCase.test_nonascii =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_nonascii)
+ test_xmlrpc.SimpleServerTestCase.test_simple1 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.SimpleServerTestCase.test_simple1)
+ test_xmlrpc.KeepaliveServerTestCase1.test_two =
unittest.skip("select.select not yet
supported")(test_xmlrpc.KeepaliveServerTestCase1.test_two)
+ test_xmlrpc.KeepaliveServerTestCase2.test_close =
unittest.skip("select.select not yet
supported")(test_xmlrpc.KeepaliveServerTestCase2.test_close)
+ test_xmlrpc.KeepaliveServerTestCase2.test_transport =
unittest.skip("select.select not yet
supported")(test_xmlrpc.KeepaliveServerTestCase2.test_transport)
+ test_xmlrpc.GzipServerTestCase.test_bad_gzip_request =
unittest.skip("select.select not yet
supported")(test_xmlrpc.GzipServerTestCase.test_bad_gzip_request)
+ test_xmlrpc.GzipServerTestCase.test_gsip_response =
unittest.skip("select.select not yet
supported")(test_xmlrpc.GzipServerTestCase.test_gsip_response)
+ test_xmlrpc.GzipServerTestCase.test_gzip_request =
unittest.skip("select.select not yet
supported")(test_xmlrpc.GzipServerTestCase.test_gzip_request)
+ test_xmlrpc.MultiPathServerTestCase.test_path1 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.MultiPathServerTestCase.test_path1)
+ test_xmlrpc.MultiPathServerTestCase.test_path2 =
unittest.skip("select.select not yet
supported")(test_xmlrpc.MultiPathServerTestCase.test_path2)
+ test_xmlrpc.FailingServerTestCase.test_basic =
unittest.skip("select.select not yet
supported")(test_xmlrpc.FailingServerTestCase.test_basic)
+ test_xmlrpc.FailingServerTestCase.test_fail_no_info =
unittest.skip("select.select not yet
supported")(test_xmlrpc.FailingServerTestCase.test_fail_no_info)
+ test_xmlrpc.FailingServerTestCase.test_fail_with_info =
unittest.skip("select.select not yet
supported")(test_xmlrpc.FailingServerTestCase.test_fail_with_info)

print "** run_unittests.test_socket"
test_socket.test_main()
@@ -60,13 +111,13 @@

try:
stacklesslib.main.mainloop.pump()
- asyncore.poll(0.05)
+ if asyncore is not None:
+ asyncore.poll(0.05)
except Exception, e:
- import asyncore
if isinstance(e, ReferenceError):
- print "run:EXCEPTION", str(e), asyncore.socket_map
+ print "run:EXCEPTION", str(e), asyncore.socket_map if
asyncore is not None else None
else:
- print "run:EXCEPTION", asyncore.socket_map
+ print "run:EXCEPTION", asyncore.socket_map if asyncore is
not None else None
traceback.print_exc()
sys.exc_clear()

Reply all
Reply to author
Forward
0 new messages