Cancellation of asyncio.wait()

384 views
Skip to first unread message

Victor Stinner

unread,
Jan 29, 2015, 4:06:21 AM1/29/15
to python-tulip
Hi,

While I tried to write a example cancelling create_connection(), I saw
that asynico.wait() doesn't cancel waited tasks when it is cancelled.

In the following example, the result of fut is never used, so asyncio
emits a warning (exception never retrieved):
---
import asyncio

def func():
fut = asyncio.Future()
fs = [fut]
fut2 = loop.create_task(asyncio.wait(fs))
fut2.cancel()
fut.set_exception(ValueError("never catched"))
try:
yield from fut2
except asyncio.CancelledError:
pass
# nobody cares of fut result?

loop = asyncio.get_event_loop()
loop.run_until_complete(func())
loop.close()
---

Is it correct that wait() doesn't cancel waited tasks?

wait_for() was modified recently to cancel the waited task when
wait_for() is cancelled:
http://bugs.python.org/issue23219

asyncio.gather() does cancel tasks when it is cancelled:
https://docs.python.org/dev/library/asyncio-task.html#asyncio.gather

If wait() must not cancel tasks, it should be better explained in the
documentation and asyncio code should be audited to ensure that tasks
are explicitly cancelled.

For example, replace:

yield from tasks.wait(fs, loop=self)

with:

try:
yield from tasks.wait(fs, loop=self)
except CancelledError:
for fut in fs:
fut.cancel()

I only found one method calling wait(): create_connection().

Victor

Victor Stinner

unread,
Apr 3, 2015, 11:17:08 AM4/3/15
to python-tulip
Hum, I didn't get any reply to this message :-( I opened an issue to
at least document the behaviour:
https://bugs.python.org/issue23859

By the way, while discussing with Natim on IRC, we found a bug in the
websockets project which uses wait() on a queue.get() task. If the
wait() is cancelled, you can loose an item of the queue if the get
task is not called. See my fix:

diff --git a/websockets/protocol.py b/websockets/protocol.py
index 7e4a94e..5353b74 100644
--- a/websockets/protocol.py
+++ b/websockets/protocol.py
@@ -166,9 +166,13 @@ class
WebSocketCommonProtocol(asyncio.StreamReaderProtocol):

# Wait for a message until the connection is closed
next_message = asyncio.async(self.messages.get(), loop=self._loop)
- done, pending = yield from asyncio.wait(
- [next_message, self.worker],
- loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
+ try:
+ done, pending = yield from asyncio.wait(
+ [next_message, self.worker],
+ loop=self._loop, return_when=asyncio.FIRST_COMPLETED)
+ except:
+ next_message.cancel()
+ raise
if next_message in done:
return next_message.result()
else:

The full recv() method:
https://github.com/aaugustin/websockets/blob/7d8191699a6d647c1b45e3e11681c5987437e5b5/websockets/protocol.py#L149

Victor

Rémy Hubscher

unread,
Apr 8, 2015, 6:15:48 AM4/8/15
to python...@googlegroups.com, victor....@gmail.com
I have seen a similar behavior with aioredis:

See my message here: https://groups.google.com/forum/#!topic/python-tulip/_J6BedIpu5I
Reply all
Reply to author
Forward
0 new messages