Raise concurrent.futures._base.CancelledError when i test django channels websocket

246 views
Skip to first unread message

nima

unread,
Jun 11, 2019, 2:08:00 PM6/11/19
to Django users
Hi. I'm using Django channels in server side for a speech to text application. When I test websocket with jmeter(+120 user or thread in 1 second), about half of requests fail. Here is my code:

class Consumer(AsyncWebsocketConsumer):
        def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
                self.session = aiohttp.ClientSession()
                # some other initialization

        async def connect(self):
                await self.accept()

        async def received(self, text_data=None, bytes_data=None):
                self.data = bytes_data
                async with self.session:
                      await self.send(bytes_data=(await self._fetch(self.session)).encode())

        async def _fetch(self, client):
                async with client.post(url=self.url,
                                                      headers=self.headers,
                                                      params=self.querystrings,
                                                      data=self.data) as resp:
                        return await resp.text()



for 100-120 requests in 1 sec(i.e. 100-120 users in jmeter) it works well. But for +120 requests some of them failed. Each request sends 319523 bytes to websocket and then websocket sends them to an api and send back response. Here is the exception that raise for +120 requests:



[2019-06-11 07:20:29 +0000] [31064] [ERROR] Exception in ASGI application

Traceback (most recent call last):

File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step

result = coro.throw(exc)

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 674, in transfer_data

message = yield from self.read_message()

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 742, in read_message

frame = yield from self.read_data_frame(max_size=self.max_size)

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 815, in read_data_frame

frame = yield from self.read_frame(max_size)

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 884, in read_frame

extensions=self.extensions,

File "/usr/local/lib/python3.5/dist-packages/websockets/framing.py", line 99, in read

data = yield from reader(2)

File "/usr/lib/python3.5/asyncio/streams.py", line 669, in readexactly

yield from self._wait_for_data('readexactly')

File "/usr/lib/python3.5/asyncio/streams.py", line 459, in _wait_for_data

yield from self._waiter

File "/usr/lib/python3.5/asyncio/futures.py", line 380, in __iter__

yield self # This tells Task to wait for completion.

File "/usr/lib/python3.5/asyncio/tasks.py", line 304, in _wakeup

future.result()

File "/usr/lib/python3.5/asyncio/futures.py", line 285, in result

raise CancelledError

concurrent.futures._base.CancelledError


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

File "/usr/local/lib/python3.5/dist-packages/uvicorn/protocols/websockets/websockets_impl.py", line 146, in run_asgi

result = await self.app(self.scope, self.asgi_receive, self.asgi_send)

File "/usr/local/lib/python3.5/dist-packages/uvicorn/middleware/asgi2.py", line 7, in __call__

await instance(receive, send)

File "/usr/local/lib/python3.5/dist-packages/channels/sessions.py", line 183, in __call__

return await self.inner(receive, self.send)

File "/usr/local/lib/python3.5/dist-packages/channels/middleware.py", line 41, in coroutine_call

await inner_instance(receive, send)

File "/usr/local/lib/python3.5/dist-packages/channels/consumer.py", line 59, in __call__

[receive, self.channel_receive], self.dispatch

File "/usr/local/lib/python3.5/dist-packages/channels/utils.py", line 52, in await_many_dispatch

await dispatch(result)

File "/usr/local/lib/python3.5/dist-packages/channels/consumer.py", line 73, in dispatch

await handler(message)

File "/usr/local/lib/python3.5/dist-packages/channels/generic/websocket.py", line 198, in websocket_receive

await self.receive(bytes_data=message["bytes"])

File "/home/nima/stt/stt_project/stt_app/consumer.py", line 257, in receive

await self.send(bytes_data=(await self._fetch(self.session)).encode())

File "/usr/local/lib/python3.5/dist-packages/channels/generic/websocket.py", line 213, in send

await super().send({"type": "websocket.send", "bytes": bytes_data})

File "/usr/local/lib/python3.5/dist-packages/channels/consumer.py", line 81, in send

await self.base_send(message)

File "/usr/local/lib/python3.5/dist-packages/channels/sessions.py", line 236, in send

return await self.real_send(message)

File "/usr/local/lib/python3.5/dist-packages/uvicorn/protocols/websockets/websockets_impl.py", line 204, in asgi_send

await self.send(data)

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 462, in send

yield from self.ensure_open()

File "/usr/local/lib/python3.5/dist-packages/websockets/protocol.py", line 646, in ensure_open

) from self.transfer_data_exc

websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason


I run asgi application with uvicorn

Reply all
Reply to author
Forward
0 new messages