I'm building an application that needs to consume messages from a high-performance queue. We're talking about 7500+ messages per second peak from this single queue.
My worker is only capable of handling around 500 messages per second due to backend limitations, so I'm scaling horizontally to about 20 workers peak. That all works fine.
In order to reach the performance required I have set and tuned the prefetch count to 500 - equal to about one seconds worth of work. Again, that all works perfectly fine.
My worker nodes are auto scaling though, and at some point in time the auto scaling logic decides that a worker is no longer needed and kills it. The signal handler in the worker catches this signal and tries to shutdown gracefully.
And that's where I hit a problem. The worker has pre-fetched around 500 messages that I rather not lose. Obviously if I just throw them away they don't get Ack'd (or maybe they get Nack'd by the cancel()?) and RabbitMQ will eventually send them to another worker. But that "eventually" is too long for my SLA.
My signal handler is currently written like this:
def signal_handler(sig, frame):
global channel
global connection
print(f'Signal {sig} received. Starting graceful shutdown.')
# Stop receiving messages
print( f"Sending the cancel")
channel.cancel()
print( f"Waiting 20 seconds for remaining messages")
time.sleep( 20 )
print( "Stop consuming" )
channel.stop_consuming()
print( "Close channel and connection" )
channel.close()
connection.close()
sys.exit(0)
But this does not work. After the channel.cancel(), no more messages are handled by the consumer callback.
Is there any way to interrupt the prefetching of messages on a channel, handling the prefetched messages as normal, and closing the channel/connection once all messages have been handled?