I am writing a consumer that needs to retrieve messages from queue for processing. I have a uncommon business requirement that if there are no messages in ~30secs interval (or next message can't be retrieved in ~30 seconds), I need to take different action. QueueingConsumer is perfect for my scenario as below:
=============
...set up connectionFactory
connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(4);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(30*1000);
try {
if (delivery == null) {
...
}else{
String msgJson = new String(delivery.getBody(), StandardCharsets.UTF_8);
}
}catch(...){..}
}
=============
But then I noticed that QueueingConsumer is deprecated as per
doc here.
I guess my only other option is to use DefaultConsumer but I could not figure out how to write my above code using DefaultConsumer. How can I do timeout with basicGet()? I think I am missing some thing basic. Any suggestions?
============
...set up connectionFactory
connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(4);
while (true) {
GetResponse response = channel.basicGet(queue, true);
try {
if (response == null) {
...
}else{
String msgJson = new String(response.getBody(), StandardCharsets.UTF_8);
}
}catch(...){..}
}
============