Hi all,
I am trying to implement batch processing using the pika client. I am consuming from a queue, aggregating the messages on the worker within a list until I reach a size of N (50 messages in my case). Once the limit is reached, I process the messages, run a basic.ack with the multiple flag set to true to acknowledge all the messages in the list. Then I empty the list and repeat this process until the queue is empty.
This works...however there is a huge problem. If my queue is under 50 messages, or whatever I set the list limit to, the consumer will hang since it is waiting for more messages to be added to the list. This obviously makes sense since we get a callback per message.
Any ideas as to how I can approach this? I tried to start two consumers listening to the same queue, with a x-priority set on one consumer with a qos value of 1 and one consumer with no priority with a qos value of 50. My idea with this setup was to have the prioritized consumer pick up messages in the queue when it is under 50. Expanding this idea further, my thinking was that if the non-prioritized consumer with qos of 50 were to pick up these messages (let's say 30 messages for example sake), it would hold onto them for some time and release them back on the queue after some time (consumer_timeout) and have the prioritized consumer process the messages until the queue is empty. This did not work out as the consumer_timeout is a rabbitmq conf feature and changing it in AWS requires contacting support. Also the setup is fragile and will not work if a consumer were to crash.
I am trying to implement this another way using add_callback_threadsafe and keeping a timer in the background but I wanted to get advice before going down this rabbit hole any further. Is batch processing using pika just not possible? Should I rethink my architecture here? Batch processing is necessary for my application due to the volume of messages coming in and because they are ML tasks, so the performance difference between a single message and multiple is essentially zero therefore we want to batch as many as we can.
Happy to clarify my points and share code examples if this is unclear. Thank you.