Can't get asyncio consumer example to run with existing event_loop

874 views
Skip to first unread message

Mikah Barnett

unread,
Nov 1, 2019, 9:49:32 AM11/1/19
to Pika
I'm trying to use the example here to get a consumer running with my async app.  I will present some code and what I've tried below, but the goal is to hook the consumer into an existing `asyncio` loop.  I just can't seem to get it to happen.

Here's how I'm starting the consumer:
async def run_amqp_listener():
    listener
= ReconnectingConsumer(CONFIG.Configuration['AMQP'])
    listener
.run()


async
def main():
    listen_task
= asyncio.create_task(run_amqp_listener())
    await asyncio
.gather(listener_task, other_tasks)


if __name__ == "__main__":
   loop
= asyncio.get_event_loop()
   loop
.run_until_complete(main())

If I just run it like that, I get
  File "/home/pi/mesh_blue/lib/python3.7/site-packages/mesh_edge/gateway/listener.py", line 377, in run
    self._consumer.run()
  File "/home/pi/mesh_blue/lib/python3.7/site-packages/mesh_edge/gateway/listener.py", line 339, in run
    self._connection.ioloop.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 521, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running


(I think) I understand that problem, the listener is trying to create a new loop but we already have a loop.  So, how do I instead hook in to the existing one?  I found the `custom_ioloop` argument in the initializer, so tried that:
def connect(self):
    LOGGER.info('Connecting to %s', self._url)
    
return AsyncioConnection(
        parameters
=pika.URLParameters(self._url),
        on_open_callback
=self.on_connection_open,
        on_open_error_callback
=self.on_connection_open_error,
        on_close_callback
=self.on_connection_closed),
        
custom_ioloop=asyncio.get_running_loop())

Then I modified the `run()` method to use:
self._connection.ioloop.create_future()

But, this just calls `connect()` over and over.  I've also tried a number of other variations but can only produce RuntimeErrors or tight-loop calls to connect()

Any guidance is greatly appreciated!



Luke Bakken

unread,
Nov 1, 2019, 11:29:49 AM11/1/19
to Pika
Hi Mikah,

The easiest way for me to help would be for you to provide a complete code sample that I can run to see the same issue you're seeing. I don't have much experience with asyncio and don't have time to try to guess what could be causing what you see.

A GitHub repository I can clone and run from would be ideal and would expedite this process.

Thanks!
Luke

Mikah Barnett

unread,
Nov 1, 2019, 11:59:30 AM11/1/19
to Pika
Thanks for having a look, Luke.  I can't give you the whole repo, but this exact `main.py` combined with the example above reproduces the error `This event loop is already running`:

import asyncio
from listener import ReconnectingExampleConsumer

async
def Random():
   
while True:
        await asyncio
.sleep(2)
       
print('[*] Random!')

async
def Listener():
    uri
= 'amqp://guest:guest@localhost:5672/%2F'
    listener
= ReconnectingExampleConsumer(uri)
    listener
.run()

async
def main():
    random_task
= Random()
    listen_task
= Listener()
    await asyncio
.gather(random_task, listen_task)


if __name__ == '__main__':
    loop
= asyncio.get_event_loop()
    loop
.run_until_complete(main())

Mikah Barnett

unread,
Nov 1, 2019, 12:03:22 PM11/1/19
to Pika
In case it makes it easier, I did indeed create a repo with just this in it: https://github.com/MikahB/PikaAsync


On Friday, November 1, 2019 at 10:29:49 AM UTC-5, Luke Bakken wrote:

Luke Bakken

unread,
Nov 1, 2019, 12:05:58 PM11/1/19
to Pika
Hi Mikah,

Yes indeed that makes it easier. I will have a look today.

Too many times people don't send exactly the same code they are talking about and it ends up wasting everyone's time. Thanks for providing a repo.

Mikah Barnett

unread,
Nov 1, 2019, 12:34:41 PM11/1/19
to Pika
You know, I think I got it working (on the minimum reproducible example) just by not calling create_task(listen_task) in main().  Instead, if I just call Listener() it seems to work okay.  Still getting errors on my full version with that fix, but I think they're a separate issue.

So, don't waste any time on it yet Luke - this may just be an elaborate, decently-documented non-issue. :)

I will post back when I either have a fix for sure, or don't.

MB

Mikah Barnett

unread,
Nov 1, 2019, 2:03:52 PM11/1/19
to Pika
I think my celebration was premature.  Taking out the create task and gather lets it run, but it doesn't appear the Consumer is getting bound or actually consuming.

So, still stuck. :(

Mikah Barnett

unread,
Nov 1, 2019, 7:05:12 PM11/1/19
to Pika
Holy shit, I got it working!!  After trying probably literally 50 different things, it was as easy as this:

import asyncio
from listener import ReconnectingExampleConsumer

async
def Random():
   
while True:
        await asyncio
.sleep(2)
       
print('[*] Random!')

async
def Listener():
    uri
= 'amqp://guest:guest@localhost:5672/%2F'
    listener
= ReconnectingExampleConsumer(uri)
    listener
.run()

async
def main():

    random_task
= asyncio.create_task(Random())
    listen_task
= asyncio.create_task(Listener())
    await asyncio
.gather(random_task)


if __name__ == '__main__':
    loop
= asyncio.get_event_loop()
    loop
.run_until_complete(main())


The part that was killing me is that even though listen_task isn't used (we don't gather it), you MUST set the result of create_task(Listener()) to a variable, I guess to hold it in memory, or it gives you errors about the loop being closed before it was finished.  But, with the above, it seems to work now.  What a crazy trick to make it go.

Luke Bakken

unread,
Nov 2, 2019, 2:57:32 PM11/2/19
to Pika
Hi Mikah,

I was just about to look at this, and you have it solved. I'll see if I can explain that behavior.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages