using DefaultConsumer with BasicGet with timeout

709 views
Skip to first unread message

amqp...@gmail.com

unread,
Aug 27, 2015, 3:39:33 PM8/27/15
to rabbitmq-users

I am writing a consumer that needs to retrieve messages from queue for processing. I have a uncommon business requirement that if there are no messages in ~30secs interval (or next message can't be retrieved in ~30 seconds), I need to take different action. QueueingConsumer is perfect for my scenario as below:

=============
...set up connectionFactory
connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(4);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(30*1000);
    try {
        if (delivery == null) {
... 
}else{
String msgJson = new String(delivery.getBody(), StandardCharsets.UTF_8);
}
}catch(...){..}
}
=============


But then I noticed that QueueingConsumer is deprecated as per doc here.

I guess my only other option is to use DefaultConsumer but I could not figure out how to write my above code using DefaultConsumer. How can I do timeout with basicGet()? I think I am missing some thing basic. Any suggestions?

============
...set up connectionFactory
connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(4);
while (true) {
    GetResponse response = channel.basicGet(queue, true);
    try {
        if (response == null) {
... 
}else{
String msgJson = new String(response.getBody(), StandardCharsets.UTF_8);
}
}catch(...){..}
}
============

Michael Klishin

unread,
Aug 27, 2015, 3:59:13 PM8/27/15
to rabbitm...@googlegroups.com
Making your consumer keep track of the last delivery timestamp should make the problem pretty straightforward. You can also cancel
the consumer from a handleDelivery.

MK

amqp...@gmail.com

unread,
Aug 27, 2015, 4:25:38 PM8/27/15
to rabbitmq-users
Thanks for your reply. Timestamp is good option. Another question I have from your response is: I have hard time understanding the use case of 'cancel consumer'.... in my case I want my consumer to always up (this is part of a spring boot app)  and so I am using while loop. In my connection setup I also have setAutomaticRecoveryEnabled , so that connection can be recovered in case of shutdown/network issue. Do you think I should not be doing that.

Thanks
Venu

Michael Klishin

unread,
Aug 27, 2015, 4:58:32 PM8/27/15
to rabbitm...@googlegroups.com, amqp...@gmail.com
On 27 Aug 2015 at 23:25:41, amqp...@gmail.com (amqp...@gmail.com) wrote:
> Another question I have from your response is: I have hard time
> understanding the use case of 'cancel consumer'.... in my case
> I want my consumer to always up (this is part of a spring boot app)
> and so I am using while loop. In my connection setup I also have
> setAutomaticRecoveryEnabled , so that connection can be recovered
> in case of shutdown/network issue. Do you think I should not be
> doing that.

QueueingConsumer won’t be automatically recovered — that’s one of the major
reasons to not use it. This is in the docs. 

Consumers are not meant to be used in a while loop. RabbitMQ pushes deliveries
(and other consumer lifecycle methods) to them. In some cases you want a consumer
to run forever, in other cases consumers are transient — that’s why cancelling
them is possible.

DefaultConsumer subclasses definitely don’t need a while loop.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


amqp...@gmail.com

unread,
Aug 28, 2015, 1:05:49 AM8/28/15
to rabbitmq-users, amqp...@gmail.com
Thanks. I really appreciate if you can tell me if there is a way to achieve my project requirement:
   - Consumer should aggregate 25 messages and then call a remote service. 
   - If the queue has less than 25 messages, it should wait for 30secs for new messages. If no new messages arrives, just aggregate what ever it got and call remote method.
   - should recover shudown/network issues

I came up with below code using the snippet under "Retrieving individual messages" heading in api guide here
Is this recommended usage? Any other ways you suggest?

==============
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.set...
     
       List<String> msgList = new ArrayList<String>();
        while (true) {
            Connection connection = null;
            try {
                connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
                channel.basicQos(25 * 2);
                boolean autoAck = false;
                while (true) {
                    GetResponse response = channel.basicGet("queuename", autoAck);
                    try {
                        if (response == null) {
                            try {
                                Thread.sleep(30 * 1000);
                            }catch(InterruptedException iex) {
                            }
                            response = channel.basicGet(mqprops.getMqEntityQ(), autoAck);
                        }

                        if (response == null) {
                            if (msgList.size() > 0 ) {
                                //call remote method
channel.basicAck(response.getEnvelope().getDeliveryTag(), true);
                                msgList.clear();
                            }
                        } else {
                            msgList.add(new String(response.getBody()));
                            if (msgList.size() > 0 ) {
                                //call remote method
       channel.basicAck(response.getEnvelope().getDeliveryTag(), true);
                                msgList.clear();
                            }
                        }
                    } catch (Exception ex) {
                        //reject all messages, and continue with get message
                        channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
                        msgList.clear();
                    }
                }
            } catch (ShutdownSignalException ssex) {
                //log error
                ...

            } catch (Exception e) {
                //log error
                ...
            }

            try {
                releaseConn(connection);
                //wait and continue with reconnection tries in loop
                Thread.sleep(10 * 1000);
            } catch (InterruptedException ie) {

            }

        }
====================

Michael Klishin

unread,
Aug 28, 2015, 4:30:25 PM8/28/15
to amqp...@gmail.com, rabbitm...@googlegroups.com
 On 28 August 2015 at 20:32:23, amqp...@gmail.com (amqp...@gmail.com) wrote:
> Thanks. I really appreciate if you can tell me if there is a way
> to achieve my project requirement:
> - Consumer should aggregate 25 messages and then call a remote
> service.
> - If the queue has less than 25 messages, it should wait for 30secs
> for new messages. If no new messages arrives, just aggregate
> what ever it got and call remote method.
> - should recover shudown/network issues
>
> I came up with below code using the snippet under "Retrieving
> individual messages" heading in api guide here(http://www.rabbitmq.com/api-guide.html)
> Is this recommended usage? Any other ways you suggest?

Your code uses polling (basic.get) and it will work but may be pretty inefficient.
The fact that you sleep after the first empty response helps but a while loop
will be a pain to recover.

The real problem here is processing a stream of messages in chunks. I'd explore
using a LinkedBlockingQueue or similar from java.util.concurrent. Unfortunately,
this will likely suffer from the same issue with recovery as QueueingConsumer
but at least you have more options compared to a while loop.

But offloading deliveries to a "buffer" in your JVM consumer app and processing them in
batches with timeouts is the right thing to do compared to a while loop with basic.get.

However, if your current implementation works for it, besides recovery I see no serious issues
with it.

Guangyun Xu

unread,
Nov 23, 2017, 7:41:03 AM11/23/17
to rabbitmq-users
Hello Michael, 


But offloading deliveries to a "buffer" in your JVM consumer app and processing them in 
batches with timeouts is the right thing to do

That is what I am trying to do. But I have the difficulty to acknowledge the processed messages. I have got the requirement that a message can only be acked after it has been processed.


final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                myMessagges.add(new String(body));
                System.out.println("Received...");

                if (myMessagges.size() >= 10) {
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                }
            }
        });
But I can not wait forever if there are only e.g. 5 messages in the list and no message is coming at all.

In addition, because of the thread-safety, I am not considering using a scheduled executor service to process (and ack) the collected messages in a new thread.

So could you please recommend a way to solve the problem?

Thanks,
Guangyun

Arnaud Cogoluègnes

unread,
Nov 23, 2017, 9:58:09 AM11/23/17
to rabbitm...@googlegroups.com
You don't have much choice than using a timeout when a batch of messages takes too long to arrive. Using a lock to avoid using the channel from several threads should be enough.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages