Aiohttp websocket close behaviour

1,647 views
Skip to first unread message

samuel.osc...@gmail.com

unread,
May 11, 2018, 8:31:26 AM5/11/18
to aio-libs
I'm building simple websocket applications to try to get a good understanding of how websockets work in aiohttp. However, I have come across some non-intuitive behaviour relating to the closing of websockets. Does anyone have any pointers?

My first attempt, was just a simple non-interactive websocket server, and a client that listens.

# server.py

# Just sends "hello_1", "hello_2" etc.to the client

from aiohttp import web

import asyncio
import itertools


async def websocket_handler(request):

    ws = web.WebSocketResponse()
    await ws.prepare(request)

    for i in itertools.count():
        if ws.closed:
            break
        await asyncio.sleep(1)
        message = "hello_" + str(i)
        print(f"sending: {message}")
        await ws.send_str(message)
        print(f"sent: {message}")

    print('websocket connection closed')

    return ws

app = web.Application()
app.router.add_get('/ws', websocket_handler)
web.run_app(app, host="127.0.0.1", port=8080)

# wsclient.py

# Just prints out the messages, and attempts to close the connection when it receives a message with content "hello_5"

import asyncio
import aiohttp


async def main():
    session = aiohttp.ClientSession()
    async with session.ws_connect('http://localhost:8080/ws') as ws:

        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'hello_5':
                    print("closing")
                    await ws.close()
                    break
                else:
                    print(msg.data)
            elif msg.type == aiohttp.WSMsgType.CLOSED:
                break
            elif msg.type == aiohttp.WSMsgType.ERROR:
                break

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(main())
loop.close()



However, when running the server and then the client, I get the following:

$ python server.py 
======== Running on http://127.0.0.1:8080 ========
(Press CTRL+C to quit)
sending: hello_0
sent: hello_0
sending: hello_1
sent: hello_1
sending: hello_2
sent: hello_2
sending: hello_3
sent: hello_3
sending: hello_4
sent: hello_4
sending: hello_5
sent: hello_5
sending: hello_6
sent: hello_6
sending: hello_7
sent: hello_7


(The server output carries on forever in this sequence)

$ python wsclient.py 
hello_0
hello_1
hello_2
hello_3
hello_4
closing

(The client then hangs forever, until the server is killed, at which point you have:

$ python wsclient.py 
hello_0
hello_1
hello_2
hello_3
hello_4
closing
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f54cf840e80>

This is very unexpected behaviour. I would have expected the server to raise an error on attempting to send a message down a closed websocket connection. I would have expected ws.closed to also be True at that point. So one way or another, I would have expected the loop to be broken out of.

Can anyone point me in the right direction here?

Andrew Svetlov

unread,
May 11, 2018, 8:37:11 AM5/11/18
to samuel.osc...@gmail.com, aio-libs
The server should take a chance to read and process WS closing frame.
Usually, it is done automatically by `ws.receive()` or `async for msg in ws: ...` but you never read a message from a peer, this is a problem.

In theory `await ws.send_()` can try to peek closing message but
1) it is not implemented
2) implementation is complex, I see too many edge cases

--
You received this message because you are subscribed to the Google Groups "aio-libs" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aio-libs+u...@googlegroups.com.
To post to this group, send email to aio-...@googlegroups.com.
Visit this group at https://groups.google.com/group/aio-libs.
To view this discussion on the web visit https://groups.google.com/d/msgid/aio-libs/0e126288-3433-435c-ac6a-a3a479b52649%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
Thanks,
Andrew Svetlov

samuel.osc...@gmail.com

unread,
May 11, 2018, 9:05:18 AM5/11/18
to aio-libs
Thanks for the reply.

I tried the following, to make sure incoming messages are processed:

# server.py

import aiohttp
from aiohttp import web

import asyncio
import itertools


async def websocket_handler(request):

    ws = web.WebSocketResponse()
    await ws.prepare(request)

    incoming = asyncio.ensure_future(process_incoming_messages(ws))

    for i in itertools.count():
        if ws.closed:
            break
        await asyncio.sleep(1)
        message = "hello_" + str(i)
        print(f"sending: {message}")
        await ws.send_str(message)
        print(f"sent: {message}")

    print('websocket connection closed')

    await incoming

    return ws


async def process_incoming_messages(ws):
    async for msg in ws:
        pass

app = web.Application()
app.router.add_get('/ws', websocket_handler)
web.run_app(app, host="127.0.0.1", port=8080)


However, the behavior is still a little strange.

The server does stop counting at 5, but it never seems to reach the print('websocket connection closed') , although no exception is logged either.

$ python server.py 
======== Running on http://127.0.0.1:8080 ========
(Press CTRL+C to quit)
sending: hello_0
sent: hello_0
sending: hello_1
sent: hello_1
sending: hello_2
sent: hello_2
sending: hello_3
sent: hello_3
sending: hello_4
sent: hello_4
sending: hello_5
sent: hello_5

The client displays the following error:

$ python wsclient.py                                                                                                                         
hello_0
hello_1
hello_2
hello_3
hello_4
closing
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f47eb443e80>

samuel.osc...@gmail.com

unread,
May 11, 2018, 9:12:09 AM5/11/18
to aio-libs
Adding the following reveals that an exception is occurring:

# server.py

import aiohttp
from aiohttp import web

import asyncio
import itertools


async def websocket_handler(request):

    ws = web.WebSocketResponse()
    await ws.prepare(request)

    incoming = asyncio.ensure_future(process_incoming_messages(ws))

    try:
        for i in itertools.count():
            if ws.closed:
                break
            await asyncio.sleep(1)
            message = "hello_" + str(i)
            print(f"sending: {message}")
            await ws.send_str(message)
            print(f"sent: {message}")
    except Exception as e:
        print(e.__class__)

    print('websocket connection closed')

    await incoming

    return ws


async def process_incoming_messages(ws):
    async for msg in ws:
        pass

app = web.Application()
app.router.add_get('/ws', websocket_handler)
web.run_app(app, host="127.0.0.1", port=8080)

$ python server.py 
======== Running on http://127.0.0.1:8080 ========
(Press CTRL+C to quit)
sending: hello_0
sent: hello_0
sending: hello_1
sent: hello_1
sending: hello_2
sent: hello_2
sending: hello_3
sent: hello_3
sending: hello_4
sent: hello_4
sending: hello_5
sent: hello_5
<class 'concurrent.futures._base.CancelledError'>
websocket connection closed


Is this the expected behavior?

The client still errors though:

$ python wsclient.py 
hello_0
hello_1
hello_2
hello_3
hello_4
closing
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f8960aa8e80>



Gustavo Carneiro

unread,
May 11, 2018, 9:33:08 AM5/11/18
to samuel.osc...@gmail.com, aio-libs
Yes, it is normal that aiohttp cancels the task that calls websocket_handler as soon as the client prematurely disconnects.

Cancelling the task that waits websocket_handler causes a asyncio.CancelledError exception to be raised in any of the await constructs: could be either await asyncio.sleep(1)
 or await ws.send_str(message).  This exception is not something wrong with your code, merely a signal that asyncio uses to communicate to a task that it is about to be cancelled.  Unless you have a good reason to do otherwise, you shouldn't catch this exception.

In my code I normally use this pattern, seems to work well in my limited testing (but the code is not yet in production so, caveat emptor):

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    try:
        async for msg in ws:
              ...do stuff with incoming messages
    finally:
        await asyncio.shield(cleanup_code())

Where cleanup_code() is whatever code you need to run after the websocket disconnects.  I have found (by trial and error) that the task was being cancelled sometimes while cleanup_code() was still yet to complete, and so the asyncio.shield() was needed.


--
You received this message because you are subscribed to the Google Groups "aio-libs" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aio-libs+u...@googlegroups.com.
To post to this group, send email to aio-...@googlegroups.com.
Visit this group at https://groups.google.com/group/aio-libs.

For more options, visit https://groups.google.com/d/optout.


--
Gustavo J. A. M. Carneiro
Gambit Research
"The universe is always one step beyond logic." -- Frank Herbert

samuel.osc...@gmail.com

unread,
May 11, 2018, 9:51:11 AM5/11/18
to aio-libs
Thanks Gustavo. That's really helpful.

I have adapted to use your pattern, but I'm still getting an error from the client:

$ python wsclient.py 
hello_0
hello_1
hello_2
hello_3
hello_4
closing
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f2f49e22e10>

Gustavo Carneiro

unread,
May 11, 2018, 10:08:40 AM5/11/18
to samuel.osc...@gmail.com, aio-libs
Ah, but that's the client, not the server.  Anyway, obviously, you forgot to close the aiohttp session.  Try the context manager, as stated in the fine manual:

async def main():
    async with aiohttp.ClientSession() as session:
         ....client code...



For more options, visit https://groups.google.com/d/optout.

samuel.osc...@gmail.com

unread,
May 11, 2018, 10:14:20 AM5/11/18
to aio-libs
Ah, the documentation websocket client example is inaccurate then: https://aiohttp.readthedocs.io/en/stable/client_quickstart.html#websockets

session = aiohttp.ClientSession()
async with session.ws_connect('http://example.org/ws') as ws:

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if msg.data == 'close cmd':
                await ws.close()
                break
            else:
                await ws.send_str(msg.data + '/answer')
        elif msg.type == aiohttp.WSMsgType.CLOSED:
            break
        elif msg.type == aiohttp.WSMsgType.ERROR:
            break

Maybe I'll post an issue on github for that.

aa...@tigmera.com

unread,
Jun 5, 2018, 5:24:04 PM6/5/18
to aio-libs
On Friday, May 11, 2018 at 6:33:08 AM UTC-7, Gustavo Carneiro wrote:
In my code I normally use this pattern, seems to work well in my limited testing (but the code is not yet in production so, caveat emptor):

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    try:
        async for msg in ws:
              ...do stuff with incoming messages
    finally:
        await asyncio.shield(cleanup_code())

Where cleanup_code() is whatever code you need to run after the websocket disconnects.  I have found (by trial and error) that the task was being cancelled sometimes while cleanup_code() was still yet to complete, and so the asyncio.shield() was needed.

Ahh!!   I just wanted to say thank you very much for this pattern Gustavo!   This issue was really confusing me.

=aaron= 
Reply all
Reply to author
Forward
0 new messages