Channels: launching a custom co-routine from SyncConsumer

165 views
Skip to first unread message

Dan Merillat

unread,
Aug 14, 2019, 8:00:15 PM8/14/19
to Django users

I have an application that's 99% using the ORM and database work so it is using SyncConsumer.
However, I have one minor part that needs to use a timer, and I cannot get it to work for the life of me.

In an AsyncConsumer, I can use asyncio.ensure_future(self.coroutine()) to start a timer.

In sync consumer, nothing I've tried works:

class TestConsumer(SyncConsumer):

    async def timer_task(self):
        print(f'timer task: {self.counter}')
        asyncio.sleep(1)
        self.counter += 1
        print(f'timer expired: {self.counter}')

    def connect(self):
        self.counter = 0

        self.timer = asyncio.ensure_future(self.timer_task())
        # ERROR:daphne.server:Exception inside application: There is no current event loop in thread 'ThreadPoolExecutor-0_20'.

        self.timer = async_to_sync(asyncio.ensure_future)(self.timer_task())
        # ERROR:daphne.server:Exception inside application: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

I thought SyncConsumer was async under the hood, but I have no idea how to leverage that.

Andrew Godwin

unread,
Aug 15, 2019, 11:11:22 PM8/15/19
to django...@googlegroups.com
SyncConsumer isn't async - it runs inside a separate synchronous thread. You'll need to get the event loop from the other thread and use call_soon_threadsafe instead!

Andrew

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/6fe7ad95-1bdb-4de2-9ed2-8c566ba27939%40googlegroups.com.

Dan Merillat

unread,
Aug 28, 2019, 10:26:25 AM8/28/19
to django...@googlegroups.com
On 8/15/19 11:10 PM, Andrew Godwin wrote:
> SyncConsumer isn't async - it runs inside a separate synchronous thread.
> You'll need to get the event loop from the other thread and use
> call_soon_threadsafe instead!

Sorry for the delay in getting back to you, it took me a while to go
through the different layers to find the appropriate event loop. Thank
you for the pointer, it got me headed down what I hope is the right path.

SyncConsumer is based on AsyncConsumer, with dispatch() ultimately
wrapped in SyncToAsync via DatabaseSyncToAsync. SyncConsumer should
have a guaranteed SyncToAsync instance with threadlocal on the executor
thread, because it is running inside a SyncToAsync to begin with. Is
that correct?

The following "works" as part of my sync consumer but I'm not positive
what the sharp edges are going to be:

def start_coroutine(self, coroutine):
loop = getattr(SyncToAsync.threadlocal, "main_event_loop", None)
if not (loop and loop.is_running()):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.call_soon_threadsafe(loop.create_task, coroutine())

Where the coroutine is anything awaitable.

The documentation at
https://channels.readthedocs.io/en/latest/topics/consumers.html goes
into the opposite scenario, using Synchronous functions such as django
ORM from an AsyncConsumer, but would really benefit from an example of
something like an idle timeout implementation for a SyncConsumer.

Ezequias Rocha

unread,
Aug 30, 2019, 8:46:47 AM8/30/19
to Django users
Sorry my dalay also. My problem is that I was looking for a wrong PostgreSQL sequence. My fault, my totally fault.

Django rocks!

Sincerely
Ezequias Rocha
Reply all
Reply to author
Forward
0 new messages