Reference implementation using queue.pop?

19 views
Skip to first unread message

Andrew Bobulsky

unread,
Dec 19, 2019, 5:10:29 PM12/19/19
to Ruby RabbitMQ libraries
Hello,

I've built a small set of scripts that work with bunny and decided to use a queue because some of my data processing can fall very far behind the activity that's generating the work, so a queue seemed a natural fit here.

I discovered that when I use queue.subscribe, the consumer will take messages off of the queue regardless of whether or not it's ready to execute the block. This isn't too problematic, but if my consumer crashes or if I kill it, I'll lose every message that was waiting to be worked on. I also can't use the management GUI to tell how much work is left for the consumer because my queues are always empty.

So I've switched to using queue.pop, but now I don't have automagic rabbit-connection error recovery and presumably need to implement error catching where I wrap my queue.pop block inside of a begin block... but I'm not really sure which exception I should rescue. Should I just rescue Bunny::Exception every time if what I really want is for my workers to try again as soon as the queue is available?  Will the rest of the connections (channel, exchange, queue) all come back up automatically?

Also, could anyone perhaps point me to a reference implementation that behaves the way I'm desiring my application to function? The Bunny docs cover using queue.pop, but doesn't have any error rescuing behavior documented along with it.

Thanks,
Andrew

Michael Klishin

unread,
Dec 19, 2019, 5:47:08 PM12/19/19
to Ruby RabbitMQ libraries
You are looking for prefetch [1] and manual acknowledgement mode [2][3], not basic.get. Avoid basic.get unless you absolutely have no other options.

Andrew Bobulsky

unread,
Dec 20, 2019, 2:44:47 PM12/20/19
to Ruby RabbitMQ libraries
Thanks Michael.

I reviewed the qos portion of the rabbit docs, and then went hunting in the bunny docs for the same thing. Does this mean that I can set a channel.prefetch() value to limit the number of unprocessed messages that a subscriber will take off of the queue and hold in memory? I want the queue depth to increase over time as unprocessed messages build up.


-Andrew
Reply all
Reply to author
Forward
0 new messages