Confirmation/transactions question

378 views
Skip to first unread message

Jason McIntosh

unread,
Aug 1, 2014, 4:57:30 PM8/1/14
to rabbitm...@googlegroups.com
SO doing some digging it appears that RabbitTemplate and confirmations are kinda "tricky".  Let's look at the low level example code for confirmations:
    54                 Connection conn = connectionFactory.newConnection();
    55                 Channel ch = conn.createChannel();
    57                 ch.confirmSelect();
    59                 // Publish
    60                 for (long i = 0; i < msgCount; ++i) {
    61                     ch.basicPublish("invalid", "invalid",
    62                                     MessageProperties.PERSISTENT_BASIC,
    63                                     "nop".getBytes());
    64                 }
    66                 ch.waitForConfirmsOrDie();

The above is a great example of how to do batch submits.  If a message fails to process, an exception is thrown on line 66.  It performs extremely well.  The problem for me is I tried to do the same thing in Spring with the RabbiteTemplate:

        ConnectionFactory fact = new ConnectionFactory();
        fact.setHost("localhost");
        fact.setUsername("guest");
        fact.setPassword("guest");
        CachingConnectionFactory cachingFact = new CachingConnectionFactory(fact);
        cachingFact.setPublisherConfirms(true);
        
        RabbitTemplate template = new RabbitTemplate(cachingFact);
        template.setConfirmCallback(new ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean ack) {
                System.out.println("Failed to process message " + correlationData.getId());
                throw new RuntimeException("ERROR on message ");
            }
        });
        template.send("invalid", "invalid", new Message("Test message".getBytes(), new MessageProperties()));


My expectation was that an exception would be thrown, but I get things as all green and happy.  Mostly I'm guessing because I can't find anyplace a "waitForConfirmsOrDie" type method, so I'm ASSUMING that the failure occurred after the JVM exited.  After doing some research my understanding on the spring template stuff that the following are true:
1)  If you need to validate your messages are published at the time of publish, publisher confirms do NOT work - use a transacted channel instead. 
2)  You can use confirms for "batch" publishing of messages, but there is some manual work required to verify messages actually got sent.  This seems relatively simple in the "raw" client, but not very easy/obvious in the Spring client.
3)  The correlation of a failed publish to the contents of the message that failed to send doesn't exist - there might be a library but it's not documented that I can see.  (tied to  #2 issue).  Your "correlation data" has to have some very detailed information in order for you to determine what to republish/handle/etc.  
4)  RabbitTemplate doesn't have a "waitForConfirmsOrDie" anyplace that I can see, so there's not a good wait to very all messages have been confirmed and if not trigger the callback or some other process.  Thus in the above example, I may not trigger the callback anytime remotely soon, and so program execution might end before the callback gets triggered.

This may all be purely a matter of documentation.  The Spring docs read really rough on this IMHO.  Really what you need to say in Spring AMQP Template is roughly:
set transactions to true if you need to make sure at the time of publish that the message WAS published to rabbitmq (synchronous commits to the server, not necessarily to a consumer).
- confirmations will let you EVENTUALLY know which messages failed to be published, but not necessarily at the time of publish through the use of callbacks (asynchronous, eventual consistency of commits).  
- Default is a fire and forget which means messages may/may not be actually published.  If the routing keys/exchanges are invalid, your message may just disappear.

I'd welcome any commentary on this - I wish I'd caught this earlier, as I'd have advised most of our web apps to set transacted to true.  I'd then recommend batch processes use low level channels so they can wait for a set of messages to be published and validate that set.

--

Gary Russell

unread,
Aug 1, 2014, 5:29:03 PM8/1/14
to rabbitm...@googlegroups.com
Reposting to group.

---------- Forwarded message ----------
From: Gary Russell <grus...@pivotal.io>
Date: Fri, Aug 1, 2014 at 5:24 PM
Subject: Re: [rabbitmq-users] Confirmation/transactions question
To: Jason McIntosh <mcin...@gmail.com>
Cc: rabbitm...@googlegroups.com


Please ask Spring AMQP question on Stack Overflow (Spring-AMQP); this list is for RabbitMQ itself.

That said....

You don't need to use transactions (that's the whole point of async confirmations - to avoid the blocking overhead).

The equivalent of waitForConfirmsOrDie might be...

        final CountDownLatch latch = new CountdownLatch(msgCount);
        template.setConfirmCallback(new ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean ack) {
                latch.countDown();
            }
        });
        for (int i = 0; i < msgCount; i++) {
            template.send("invalid", "invalid", new Message("Test message".getBytes(), new MessageProperties()));
            }
        assertTrue(latch.await(10, TimeUnit.SECONDSS);

The send() methods allow you to provide some correlation data to determine which messages were successful. If you are correlating, you might use a BlockingQueue instead of the CountDownLatch above.

Correlation is no different; just that with Spring, you can correlate on some higher level entity such as invoice number instead of the message sequence.

Your closing statements are mostly correct, but that is for RabbitMQ in general, not just Spring AMQP. Sending messages to an invalid routing key will be silently dropped. Sending to an invalid exchange causes the Channel to close; you can attach a listener to the channel to get that event. Since Spring AMQP 1.3.5, such errors are now send as a NACK to the confirm() method.

Hope that clears things up.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Reply all
Reply to author
Forward
0 new messages