Django Channels 2 Group_send to channel layer causes runtime error: "got Future attached to a different loop"

878 views
Skip to first unread message

Luke Hebert

unread,
Feb 16, 2018, 4:08:55 PM2/16/18
to Django users
Trying to call the "alert_receive" consumer method

import json
from channels.consumer import SyncConsumer
from channels.exceptions import StopConsumer
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

class ClientAlertPtrConsumer(SyncConsumer):
   
def websocket_connect(self, event):
        async_to_sync
(self.channel_layer.group_add)("AlertReceivers", self.channel_name)
       
self.send({
               
"type": "websocket.accept",
           
})
   
   
def websocket_receive(self, event):
        content
= json.dumps("received")
        async_to_sync
(self.channel_layer.group_send)("AlertReceivers", {"type":"alert.receive"})
        
   
def websocket_disconnect(self, event):
        async_to_sync
(self.channel_layer.group_discard)("AlertReceivers", self.channel_name)
       
raise StopConsumer
       

   
def alert_receive(self, event):                 # Get a message on the alert.receive type
        content
= json.dumps("Group send working")
       
self.send({
           
"type":"websocket.send",                # then send a message out on websocket protocol to whatever the 'self' channel is.
           
"text":content,
           
})

from outside this consumer class using the approach in documentation like this

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
channel_layer = get_channel_layer()

async_to_sync
(channel_layer.group_send)("AlertReceivers", {'type':"alert.receive"})

and getting the following traceback having to do with the concurrent.futures module:

Traceback (most recent call last):
  File "/usr/lib/python3.5/code.py", line 91, in runcode
    exec(code, self.locals)
  File "<console>", line 1, in <module>
  File "/usr/local/lib/python3.5/dist-packages/asgiref/sync.py", line 49, in __call__
    return call_result.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.5/dist-packages/asgiref/sync.py", line 63, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/channels_redis/core.py", line 313, in group_send
    pool = await self.connection(self.consistent_hash(group))
  File "/usr/local/lib/python3.5/dist-packages/channels_redis/core.py", line 377, in connection
    self.pools[index] = await aioredis.create_redis_pool(**self.hosts[index])
  File "/usr/local/lib/python3.5/dist-packages/aioredis/commands/__init__.py", line 197, in create_redis_pool
    loop=loop)
  File "/usr/local/lib/python3.5/dist-packages/aioredis/pool.py", line 59, in create_pool
    await pool.wait_closed()
  File "/usr/local/lib/python3.5/dist-packages/aioredis/pool.py", line 177, in wait_closed
    await asyncio.shield(self._close_waiter, loop=self._loop)
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /usr/local/lib/python3.5/dist-packages/asgiref/sync.py:63> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]> got Future <Future pending> attached to a different loop



Note that the 
async_to_sync(channel_layer.group_send)("AlertReceivers", {'type':"alert.receive"})

within the websocket_receive method of the consumer class works well, sending the event out onto redis-based channel layers and then sending json encoded "Group send working" WS message to client. 


Luke Hebert

unread,
Feb 16, 2018, 4:08:55 PM2/16/18
to Django users
Trying to call a consumer method inside a SyncConsumer class

def alert_receive(self, event):                 # Get a message of the alert.receive type off of channel_layers
        content = json.dumps("Group send working")
        self.send({
            "type":"websocket.send",                # then send a message out on websocket protocol to whatever the 'self' channel is. 
            "text":content,
            })

from a python script outside of this consumer class using the approach from the docs:

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)("AlertReceivers", {"type": "alert.receive"})

The last line there raises this traceback:
Note that if I do a 
async_to_sync(self.channel_layer.group_send)("AlertReceivers", {"type": "alert.receive"})
from within the consumer class, it works well and getting that alert.receive type event onto the channel_layer and sending the JSON encoded "Group send working" message out to all member channels of the group, just like it should do from outside the consumer. 

Andrew Godwin

unread,
Feb 16, 2018, 4:11:07 PM2/16/18
to django...@googlegroups.com
Can you update your versions of `asgiref` and `channels_redis` please? I fixed those bugs a couple of days ago.

Andrew

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/8aa00799-ffe1-4eba-b991-cd8d04da38d3%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

raed

unread,
Mar 22, 2023, 8:53:15 AM3/22/23
to Django users
how did you test it ? did you use a specific cmd to excute ?
Reply all
Reply to author
Forward
0 new messages