SimpleMessageListenerContainer stopping after trying to send retried message to DLQ

2,440 views
Skip to first unread message

Shuchi Gupta

unread,
Jan 8, 2015, 6:52:00 AM1/8/15
to rabbitm...@googlegroups.com
Hi,

I have some business logic on which basis I want to requeue the message in the queue only three times, after which I want to route the message to a Dead Letter Queue. For this, I have set AcknowledgeMode to MANUAL in SimpleMessageListenerContainer and put an Advice Chain and retry template to retry 3 times after which RejectAndDontRequeueRecoverer should send the message to Dead Letter Queue. However, after the retries, my container throws an exception:

org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer - Retries exhausted for message

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:758)


After this I have to restart the container to receive messages again. The code I am using is given below:

@Bean

SimpleMessageListenerContainer container(

    ConnectionFactory connectionFactory, Object listenerAdapter) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

    container.setConnectionFactory(connectionFactory);

    container.setQueueNames(queueName);

    container.setMessageListener(listenerAdapter);

    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

    container.setAdviceChain(workMessagesAdviceChain()); // for retry logic

    return container;

}

private Advice[] workMessagesAdviceChain() {

    return new Advice[] { workMessagesRetryInterceptor() };

}

@Bean

public RetryOperationsInterceptor workMessagesRetryInterceptor() {

    return RetryInterceptorBuilder.stateless().retryOperations(retryTemplate(3)).recoverer(new RejectAndDontRequeueRecoverer()).build();

}

@Bean

public Queue queue() {

    return new Queue(queueName);

}


@Bean

public Queue deadLetterQueue() {

        Map<String, Object> arguments = new HashMap<String, Object>();

        arguments.put("x-dead-letter-exchange", exchangeName);

        arguments.put("x-message-ttl", 300000);

        return new Queue("deadLetterQueue", true, false, false, arguments);

}

@Bean

DirectExchange exchange() {

    return new DirectExchange(exchangeName, true, false);

}

@Bean

Binding binding() {

    return BindingBuilder.bind(queue()).to(exchange()).with(queueName);

}

@Bean

Object listenerAdapter() {

    return new MessageReceiver();

}

public class MessageReceiver implements ChannelAwareMessageListener {

    @Override

    public void onMessage(org.springframework.amqp.core.Message message, Channel channel) throws Exception {

        //Some business logic

        if(msgHandled)

           throw new RuntimeException("Message cannot be handled right now " + message);

        else

           channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

    }

}

What am I doing wrong in the configuration? Why is the container stopping? When I run rabbitmqctl list_queues I cn see the deadLetterQueue but messages are not getting routed to it. Please help!!!


Thanks,

Shuchi


Shuchi Gupta

unread,
Jan 10, 2015, 12:50:39 PM1/10/15
to rabbitm...@googlegroups.com
Can somebody please help me? 

Gary Russell

unread,
Jan 10, 2015, 2:03:03 PM1/10/15
to Shuchi Gupta, rabbitm...@googlegroups.com
Please ask questions about Spring AMQP on Stack Overflow [spring-amqp] tag. This group is for the rabbitmq broker and the code the rabbit team maintains. I monitor this list but sometimes will miss Spring-specific questions.

That said, if you want the container to take care of acks (including rejecting after the retries are exhausted), you should use AUTO, not MANUAL acknowledgements.

If you are using MANUAL acks, you have to do the ack (or reject) yourself. It is rare to need to use MANUAL acks with Spring; the container will take care of everything for you (basicAck when no exception and basicReject after an exception, with the requeue argument depending on configuration and/or retries).

The container will never ack or reject a message when using MANUAL acks.

Gary Russell
Spring AMQP Lead
Pivotal Software Inc.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Shuchi Gupta

unread,
Jan 10, 2015, 2:15:35 PM1/10/15
to rabbitm...@googlegroups.com
Understood. But how do I tell the container that a message should be retried on the basis of business logic and if the number of retries are over, then send it to DLQ. I have configured a ChannelAwareMessageListener for handling messages. I suppose I don't need that now..? Also, if I have understood correctly, what exception should I throw from the business logic for the container to take notice and requeue the message in queue/DLQ?

Thanks,
Shuchi

On Thursday, January 8, 2015 at 5:22:00 PM UTC+5:30, Shuchi Gupta wrote:

Gary Russell

unread,
Jan 10, 2015, 3:50:56 PM1/10/15
to Shuchi Gupta, rabbitm...@googlegroups.com
You don't need to do anything; you already have a RejectAndDontRequeueRecoverer configured; your business logic should just throw an exception. Since you are using stateless retry, the retry template will catch the exception and retry the delivery according to the retry policy. When the retries are exhausted, the retry template will call the recover, which will wrap your exception in an AmqpRejectAndDontRequeueException, which is a signal to the container to reject (with requeue=false) the message; then it will go to the DLX/DLQ.

If you want to handle it yourself (without a retry interceptor), you can simply throw an AmqpRejectAndDontRequeueException from your business logic. 

You can also configure the retry policy to only retry for certain exception types. Such a configuration would allow you to usurp the retry logic if you know retries are worthless (e.g. due to a badly formed message that can never succeed).

You can also configure the listener container to always reject (and not requeue) messages by setting defaultRequeueRejected to false (it's true by default and always requeues after an exception, unless there is an AmqpRejectAndDontRequeueException in the cause tree).

And, no, you don't need a ChannelAwareMessageListener since you don't need to interact with the Channel; you can just use a MessageListener.

Gary

Shuchi Gupta

unread,
Jan 11, 2015, 11:50:55 PM1/11/15
to rabbitm...@googlegroups.com
Thanks Gary! I created a DLX FanoutExchange and a DLQ for it. Don't know why DirectExchange was not working for DLX. But it finally worked :)


On Thursday, January 8, 2015 at 5:22:00 PM UTC+5:30, Shuchi Gupta wrote:
Reply all
Reply to author
Forward
0 new messages