Issue with publishing messages very fast

168 views
Skip to first unread message

Matt Sanderson

unread,
Aug 23, 2022, 8:38:29 PM8/23/22
to Pika
Hello, so I am not sure if this is a bug or I am just not using this correctly, but I'd like some feedback on something...

So I have taken the simple example code from here to set up an async publisher.
Other than the ` connection.close()` needing to be removed in my code otherwise it falls over, this works fine when sending a single message.
The problems start happening when I start wanting to send large amounts of messages.

If I do a simple change like:

```py
def on_channel_open(channel):
    i = 0
    while True:
        channel.basic_publish('test_exchange',
                              'test_routing_key',
                              'message body value',
                              pika.BasicProperties(content_type='text/plain',
                                                   delivery_mode=1))
        i += 1
        if i % 100_000 == 0:
            print(f'Sent {i} messages so far...')
```

And then run it it will sit there saying it has sent messages but will never actually deliver any to the queue (and yes, I have the queue actually set up so that in the case of a non-continuous set of messages they will be delivered).
If I ctrl+C the process then it will strop this loop, and only then will it start publishing messages to the queue.

This is a somewhat simplified example of the problem I have with my actual code is but it does indicate the issue. In my proper code I have a publisher class which takes in a multiprocessing queue and then reads from it to publish messages (I wanted some way to allow pushing messages to the class when it's run in a separate process).
The main body of that code is based on the async publisher example found here and with the PUBLISH_INTERVAL set to 0. This massively reduces the rate at which messages can be published however (simple example code can do about 20k/s whereas this full async publisher only hits about 3k/s)

I did find a way to get a bit more performance and that was in each publish loop to send of 10 messages for publishing before calling the `schedule_next_message` function, but then if the publishing occurs slowly messages only get published in blocks of 10's, sometimes 20's or 30's which is what tipped me off to this in the first place.

Anyway, sorry for all the text, hopefully this makes sense and I'd love some feedback on this as I'm not sure if this is actually a bug that the sending doesn't actually happen or whether I need to do something else so that it works (and hopefully keeps the same performance!)

Thanks!

Gavin M. Roy

unread,
Aug 23, 2022, 8:44:21 PM8/23/22
to pika-...@googlegroups.com
Are you using an async client? Where are you waiting on the IOLoop?

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/6189a51e-a2ab-4c1b-8539-7a888adc71b4n%40googlegroups.com.

Matt Sanderson

unread,
Aug 23, 2022, 8:47:26 PM8/23/22
to Pika
The code I published is taken exactly from the first link I sent. For completeness I'll just paste my entire code snippet here which can be run as-is:

```
import pika


# Step #3
def on_open(connection):
    print('opened')
    connection.channel(on_open_callback=on_channel_open)


# Step #4
def on_channel_open(channel):
    i = 0
    while True:
        channel.basic_publish('test_exchange',
                              'test_routing_key',
                              'message body value',
                              pika.BasicProperties(content_type='text/plain',
                                                   delivery_mode=1))
        i += 1
        if i % 100_000 == 0:
            print(f'Sent {i} messages so far...')


# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)
try:
    # Step #2 - Block on the IOLoop
    connection.ioloop.start()
# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
    connection.ioloop.start()
```

Gavin M. Roy

unread,
Aug 23, 2022, 9:32:37 PM8/23/22
to pika-...@googlegroups.com
So your while true is starving the IO Loop, you need to yield or do something so you don't keep the interpreter in a tight loop.

Gavin M. Roy

unread,
Aug 23, 2022, 9:36:55 PM8/23/22
to pika-...@googlegroups.com
If you're using Python 3 I'd suggest you use the AsyncIO connection adapter and read about the AsyncIO module and behaviors in the Python 3 documentation.

Matt Sanderson

unread,
Aug 23, 2022, 9:47:38 PM8/23/22
to Pika
Yep, my code is using 3.9 so no worries there.
I did just end up finding a solution which basically means I only publish for 0.1s then call the `self._connection.ioloop.call_later(0, self.publish_messages)`. This seems to give me a rate of ~10k/s which seems good enough, but I will look into replacing the SelectConnection with the AsyncioConnection and see how I go, thanks!
Reply all
Reply to author
Forward
0 new messages