Re: [rabbitmq-users] Re: Terminating basicConsume after all messages in the specific queue are processed

29 views
Skip to first unread message

Michal Kuratczyk

unread,
Sep 1, 2022, 3:23:42 AM9/1/22
to rabbitm...@googlegroups.com
Hi,

I'm not sure I fully understand but I believe your scenario is:
1. you submit N tasks to the inputs queue
2. you expect N results in the results queue
3. you would want to stop consuming after N messages, so that you can process all the results

If I understand correctly, you expect RabbitMQ to know that all results are now in and you can stop consuming? RabbitMQ would have to know that no more than N messages are expected to be ever delivered to the results queue. It feels like that knowledge belongs to your app though - if it knows how many results are expected, it should stop consuming once it receives N messages. Why do you want to check if the queue is empty?
The queue is not empty - it has between 0 and N unacked messages and RabbitMQ is waiting for the messages to be acked and potentially for new messages to arrive. Maybe I'm completely missing the point
but I think you want to delegate to RabbitMQ, something that you need to implement in your app.

Best,

On Fri, Aug 26, 2022 at 11:40 AM Roshak Zarhoun <ros...@gmail.com> wrote:
Here is snipper of the part of the code for better insight of what should happen! 
https://gist.github.com/roxchgt/61f5137c388c54ee4cd11b82b08b2352
On Friday, August 26, 2022 at 10:51:41 AM UTC+2 Roshak Zarhoun wrote:
Hi all, 

I've been searching since a week to find a right solution for using basicConsume in a more complex scenario, a sort of async RPC, where the main application (publisher) sends a bunch of data sets to worker nodes (over one "inputs" queue) for sandboxed processing and later it should collect the results from the worker(s) through a "results" queue, AND then post-process them.

I've implemted everything and it works up to the point where the main app should collect the results in a separate function, which defines a DeliverCallback, which lets's says deserializes and puts the received messages in a ResultsList and it should stop consuming after all messages are taken from that results queue and return that List.
I couldn't find a proper condition to terminate the basicConsume and move on. the messageCount doesn't change and I'll be got stuck in an infinite loop if I use this as termination condition, I've read in the docs that the messages are only marked as N/Ack and that's probably why the message count doesn't go down after receiving and acknowledging them!

I realized now that I simply don't understand how the basicConsume works in the background?! I've read most parts of the documentation, googled my issue and couldn't find any useful result, only some open short discussion about similar situation...

I've tried different tutorials (my use case is more like the Tutorial 2, but in a duplex variation, where the workers also send something back!) In the basic tutorials, the basicConsume method is at the bottom of main method (Java) and is always listening.
However I need it to be at the middle of my function and only do the receiving part and die! I don't know how to do it correctly.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/290c5282-f41b-47bc-af6e-bce56c2a3f52n%40googlegroups.com.

--
Michał
RabbitMQ team
RabbitMQ team
Reply all
Reply to author
Forward
0 new messages