[stacklessexamples] r191 committed - - Copied support to make 'settimeout' work from the asyncore stackless...

18 views
Skip to first unread message

stackles...@googlecode.com

unread,
Mar 2, 2012, 8:06:58 PM3/2/12
to stackless-exa...@googlegroups.com
Revision: 191
Author: richard.m.tew
Date: Fri Mar 2 17:06:04 2012
Log: - Copied support to make 'settimeout' work from the asyncore
stacklesssocket.py.
- Fixed disconnection while reading within 'recv' to appear as receiving an
empty string, as in the normal socket module.
http://code.google.com/p/stacklessexamples/source/detail?r=191

Modified:
/trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py

=======================================
--- /trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py
Thu Mar 1 18:08:21 2012
+++ /trunk/libraries/stacklesslib/stacklesslib/replacements/socket_pyuv.py
Fri Mar 2 17:06:04 2012
@@ -45,10 +45,12 @@
__all__ = stdsocket.__all__
def adopt_stdsocket_constants():
for k, v in stdsocket.__dict__.iteritems():
- if k in __all__:
+ if k in __all__ and k.upper() == k:
globals()[k] = v
elif k == "EBADF":
globals()[k] = v
+ globals()["error"] = stdsocket.error
+ globals()["timeout"] = stdsocket.timeout
adopt_stdsocket_constants()

_TCP_KEEPALIVE = 3
@@ -142,11 +144,11 @@
self._type = type
self._proto = proto
# Internal support.
+ if can_timeout():
+ self._timeout = stdsocket.getdefaulttimeout()
_socket_map[id(self)] = self
start_pumping()
- def accept(self): # TCP # COMPLETE
- """
- """
+ def accept(self): # TCP
def accept_result(_new_tcp_socket):
_new_tcp_socket._was_connected = True
# Emulate the standard 'socket.accept' return value.
@@ -169,14 +171,14 @@
# Otherwise, assume all is well and block on a channel.
sys.exc_clear()
# Block until there is an incoming connection.
- self._accept_channel.receive()
+ self._receive_with_timeout(self._accept_channel)
self._tcp_socket.accept(_new_tcp_socket._tcp_socket)
return accept_result(_new_tcp_socket)
finally:
self._accept_channel = None
- def bind(self, address): # TCP / UDP # COMPLETE
+ def bind(self, address): # TCP / UDP
self._socket.bind(address)
- def close(self): # TCP / UDP # COMPLETE
+ def close(self): # TCP / UDP
"""
Blocks until the close has completed.. correct behaviour?
"""
@@ -186,33 +188,34 @@
channel.send(None)
self._socket.close(close_callback)
channel.receive()
- def connect(self, address): # TCP # COMPLETE
+ def connect(self, address): # TCP
err = self.connect_ex(address)
if err:
raise stdsocket.error(err)
- def connect_ex(self, address): # COMPLETE
+ def connect_ex(self, address):
channel = stackless.channel()
channel.preference = 1
def connect_callback(_tcp_handle, err):
- channel.send(err)
+ if channel.balance < 0:
+ channel.send(err)
self._tcp_socket.connect(address, connect_callback)
- err = channel.receive()
+ err = self._receive_with_timeout(channel)
if err is None:
err = 0
self._connected = True
self._was_connected = True
return err
- def fileno(self):
- pass
- def getpeername(self): # TCP # COMPLETE
+ def fileno(self): # ?? HOW?
+ raise NotImplementedError("socket.fileno")
+ def getpeername(self): # TCP
return self._tcp_socket.getpeername()
- def getsockname(self): # TCP # COMPLETE
+ def getsockname(self): # TCP
return self._tcp_socket.getsockname()
- def ioctl(self, control, option):
- pass
+ def ioctl(self, control, option): # ?? HOW?
+ raise NotImplementedError("socket.ioctl")
def listen(self, backlog): # TCP
def listen_callback(_listen_tcp_socket, err):
- if self._accept_channel:
+ if self._accept_channel and self._accept_channel.balance < 0:
if err is None:
self._accept_channel.send(None)
else:
@@ -220,9 +223,13 @@
self._accept_channel.send_exception(stdsocket.error,
errno_map[err])
self._tcp_socket.listen(listen_callback, backlog)
self._listening = True
- def makefile(self, mode, bufsize):
- pass
- def recv(self, bufsize, flags=0):
+ def makefile(self, mode, bufsize): # ?? HOW?
+ raise NotImplementedError("socket.makefile")
+ def recv(self, bufsize, flags=0): # TCP?
+ """
+ TODO: Deal with the 'bufsize' constraint. Currently data returned
+ can be larger than this size.
+ """
if self.type != SOCK_DGRAM and not self._connected:
# Sockets which have never been connected do this.
if not self._was_connected:
@@ -233,57 +240,70 @@
if self.type == SOCK_STREAM:
def tcp_callback(redundant_handle, data, err):
self._tcp_socket.stop_read()
- if err is None:
- channel.send(data)
- else:
- channel.send_exception(stdsocket.error, errno_map[err])
+ if channel.balance < 0:
+ if err == pyuv.errno.UV_EOF:
+ err = None
+ data = ""
+ if err is None:
+ channel.send(data)
+ else:
+ channel.send_exception(stdsocket.error,
errno_map[err])
self._tcp_socket.start_read(tcp_callback)
else:
- raise RuntimeError("No UDP handling yet")
- return channel.receive()
- def recvfrom(self, bufsize, flags=0):
+ raise NotImplementedError("socket.recvfrom/UDP")
+ return self._receive_with_timeout(channel)
+ def recvfrom(self, bufsize, flags=0): # UDP?
+ """
+ TODO: Deal with the 'bufsize' constraint. Currently data returned
+ can be larger than this size.
+ """
channel = stackless.channel()
channel.preference = 1
if self.type == SOCK_DGRAM:
def udp_callback(redundant_handle, address, data, err):
self._udp_socket.stop_recv()
- if err is None:
- channel.send((data, address))
- else:
- channel.send_exception(stdsocket.error, errno_map[err])
+ if channel.balance < 0:
+ if err == pyuv.errno.UV_EOF:
+ err = None
+ data = ""
+ if err is None:
+ channel.send((data, address))
+ else:
+ channel.send_exception(stdsocket.error,
errno_map[err])
self._udp_socket.start_recv(udp_callback)
else:
- raise RuntimeError("No TCP handling yet")
- return channel.receive()
+ raise NotImplementedError("socket.recvfrom/TCP")
+ return self._receive_with_timeout(channel)
def recvfrom_into(self, buffer, nbytes, flags=0):
- pass
+ raise NotImplementedError("socket.recvfrom_into")
def recv_into(self, buffer, nbytes, flags=0):
- pass
- def send(self, string, flags=0): # TCP / UDP # COMPLETE
+ raise NotImplementedError("socket.recv_into")
+ def send(self, string, flags=0): # TCP / UDP
channel = stackless.channel()
channel.preference = 1
def write_callback(redundant_tcp_handle, err):
- if err is None:
- channel.send(None)
- else:
- # TODO: Really should be able to pass multiple arguments
to the exception type..
- channel.send_exception(stdsocket.error, errno_map[err])
+ if channel.balance < 0:
+ if err is None:
+ channel.send(None)
+ else:
+ # TODO: Really should be able to pass multiple
arguments to the exception type..
+ channel.send_exception(stdsocket.error, errno_map[err])
self._socket.write(string, write_callback)
- channel.receive()
+ self._receive_with_timeout(channel)
return len(string)
- def sendall(self, string, flags=0): # COMPLETE
+ def sendall(self, string, flags=0):
return self.send(string, flags)
def sendto(self, string, flags=0, address=None):
- pass
- def setblocking(self, flag): # COMPLETE
+ raise NotImplementedError("socket.sendto")
+ def setblocking(self, flag):
self._blocking = flag
- def settimeout(self, value): # COMPLETE
+ def settimeout(self, value):
if value and not can_timeout():
raise RuntimeError("This is a stackless socket - to have
timeout support you need to provide a sleep function")
self._timeout = value
- def gettimeout(self): # COMPLETE
+ def gettimeout(self):
return self._timeout
- def getsockopt(self, level, optname, buflen=None): # COMPLETE
+ def getsockopt(self, level, optname, buflen=None):
if level == stdsocket.IPPROTO_TCP:
if optname == stdsocket.TCP_NODELAY:
return self._opt_nodelay
@@ -292,7 +312,7 @@
if level == socket.SOL_SOCKET:
if optname == socket.SO_KEEPALIVE:
return self._opt_keepalive
- def setsockopt(self, level, optname, value): # COMPLETE
+ def setsockopt(self, level, optname, value):
if level == stdsocket.IPPROTO_TCP:
if optname == stdsocket.TCP_NODELAY:
self._opt_nodelay = bool(value)
@@ -304,7 +324,7 @@
if optname == stdsocket.SO_KEEPALIVE:
self._opt_keepalive = bool(value)
self._tcp_socket.keep_alive(self._opt_keepalive,
self._opt_keepalive_delay)
- def shutdown(self, how): # TCP # COMPLETE
+ def shutdown(self, how): # TCP
if how != stdsocket.SHUT_WR:
raise RuntimeError("Not supported")
channel = stackless.channel()
@@ -324,5 +344,28 @@
def proto(self):
return self._proto
# Custom internal logic.
- def _is_non_blocking(self):
- return not self._blocking or self._timeout == 0.0
+ def _receive_with_timeout(self, channel):
+ if self._timeout is not None:
+ # Start a timing out process.
+ # a) Engage a pre-existing external tasklet to send an
exception on our channel if it has a receiver, if we are still there when
it times out.
+ # b) Launch a tasklet that does a sleep, and sends an
exception if we are still waiting, when it is awoken.
+ # Block waiting for a send.
+ if _timeout_func is not None:
+ # You will want to use this if you are using sockets in a
different thread from your sleep functionality.
+ _timeout_func(self._timeout, channel, (timeout, "timed
out"))
+ elif _sleep_func is not None:
+
stackless.tasklet(self._manage_receive_with_timeout)(channel)
+ else:
+ raise NotImplementedError("should not be here")
+ try:
+ ret = channel.receive()
+ except BaseException, e:
+ raise e
+ return ret
+ else:
+ return channel.receive()
+ def _manage_receive_with_timeout(self, channel):
+ if channel.balance < 0:
+ _sleep_func(self._timeout)
+ if channel.balance < 0:
+ channel.send_exception(timeout, "timed out")

Reply all
Reply to author
Forward
0 new messages