Drain high-performance channel with prefetched messages gracefully on exit?

51 views
Skip to first unread message

Wouter Liefting

unread,
Apr 26, 2024, 4:37:44 AM4/26/24
to rabbitmq-users
I'm building an application that needs to consume messages from a high-performance queue. We're talking about 7500+ messages per second peak from this single queue.

My worker is only capable of handling around 500 messages per second due to backend limitations, so I'm scaling horizontally to about 20 workers peak. That all works fine.

In order to reach the performance required I have set and tuned the prefetch count to 500 - equal to about one seconds worth of work. Again, that all works perfectly fine.

My worker nodes are auto scaling though, and at some point in time the auto scaling logic decides that a worker is no longer needed and kills it. The signal handler in the worker catches this signal and tries to shutdown gracefully.

And that's where I hit a problem. The worker has pre-fetched around 500 messages that I rather not lose. Obviously if I just throw them away they don't get Ack'd (or maybe they get Nack'd by the cancel()?) and RabbitMQ will eventually send them to another worker. But that "eventually" is too long for my SLA.

My signal handler is currently written like this:
def signal_handler(sig, frame):
    global channel
    global connection
    print(f'Signal {sig} received. Starting graceful shutdown.')
   
    # Stop receiving messages
    print( f"Sending the cancel")
    channel.cancel()
   
    print( f"Waiting 20 seconds for remaining messages")
    time.sleep( 20 )
   
    print( "Stop consuming" )
    channel.stop_consuming()
   
    print( "Close channel and connection" )
    channel.close()
    connection.close()

    sys.exit(0)

But this does not work. After the channel.cancel(), no more messages are handled by the consumer callback.

Is there any way to interrupt the prefetching of messages on a channel, handling the prefetched messages as normal, and closing the channel/connection once all messages have been handled?

Luke Bakken

unread,
Apr 26, 2024, 1:24:01 PM4/26/24
to rabbitmq-users
Hello,

It would be very helpful to know the following:
  • RabbitMQ and Erlang version
  • Client library and version
It looks like you're using Python, which means I can assume you are using Pika. My guess is you're experiencing this issue:


Pika has always sent a NACK for outstanding deliveries when a consumer is stopped. I'm not sure why, that pre-dates when I became the maintainer. You have the following options:

* Use a different Python library that does not do this.
* Use a different programming language / client library.
* Contribute to Pika to change this behavior, or make it optional.
* Use RabbitMQ streams and the rstream library - https://github.com/qweeze/rstream/releases

Thanks,
Luke

Luke Bakken

unread,
Apr 26, 2024, 1:28:03 PM4/26/24
to rabbitmq-users

Wouter Liefting

unread,
Apr 27, 2024, 11:02:39 AM4/27/24
to rabbitmq-users
Thanks. At least I know now that it's not possible in pika.

I'm going to look at my options.

Reply all
Reply to author
Forward
0 new messages