Batch processing using pika

1,329 views
Skip to first unread message

Jeton Koka

unread,
Jun 15, 2022, 12:55:36 PM6/15/22
to Pika
Hi all,

I am trying to implement batch processing using the pika client. I am consuming from a queue, aggregating the messages on the worker within a list until I reach a size of N (50 messages in my case). Once the limit is reached, I process the messages, run a basic.ack with the multiple flag set to true to acknowledge all the messages in the list. Then I empty the list and repeat this process until the queue is empty.


This works...however there is a huge problem. If my queue is under 50 messages, or whatever I set the list limit to, the consumer will hang since it is waiting for more messages to be added to the list. This obviously makes sense since we get a callback per message. 

Any ideas as to how I can approach this? I tried to start two consumers listening to the same queue, with a x-priority set on one consumer with a qos value of 1 and one consumer with no priority with a qos value of 50. My idea with this setup was to have the prioritized consumer pick up messages in the queue when it is under 50. Expanding this idea further, my thinking was that if the non-prioritized consumer with qos of 50 were to pick up these messages (let's say 30 messages for example sake), it would hold onto them for some time and release them back on the queue after some time  (consumer_timeout) and have the prioritized consumer process the messages until the queue is empty. This did not work out as the consumer_timeout is a rabbitmq conf feature and changing it in AWS requires contacting support. Also the setup is fragile and will not work if a consumer were to crash.

I am trying to implement this another way using add_callback_threadsafe and keeping a timer in the background but I wanted to get advice before going down this rabbit hole any further. Is batch processing using pika just not possible? Should I rethink my architecture here? Batch processing is necessary for my application due to the volume of messages coming in and because they are ML tasks, so the performance difference between a single message and multiple is essentially zero therefore we want to batch as many as we can. 


Happy to clarify my points and share code examples if this is unclear. Thank you.





Jeton Koka

unread,
Jun 15, 2022, 12:59:06 PM6/15/22
to Pika
One thing I forgot to mention...I did see some solutions where people just combined several messages into one giant message and processed that instead of fiddling around with aggregating messages client side. I am not opposed to this solution but also wanted to get thoughts on this. Due to size limit of messages (2 gigs I believe), I think this would work for my system. 

Jeton Koka

unread,
Jun 21, 2022, 11:10:01 PM6/21/22
to Pika
FYI ended up figuring this out using the blocking consumption form instead of callback by setting an inactivity_timeout and processing after n seconds if nothing comes in. See here: https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume

Cheers for such great docs! 

Azim Ahmed Bijapur

unread,
8:22 AM (15 hours ago) 8:22 AM
to Pika
Isn't basic get an option in case of batch processing. It gives more control. In my case I need to perform a batch inference (gpu bound) using a vision language model. I aggregate messages into a list and then process them at once. Is this approach fine. I am aware of it being a poll based approach (involves round trip) but it serves the purpose well. Gives me total control. To be more specific there are 2 channels and a single consumer. I serve them in round robin fashion.
Reply all
Reply to author
Forward
0 new messages