Implementing RabbitMQ transactions for message publishing without message loss

938 views
Skip to first unread message

Dark Ninja

unread,
Jun 6, 2019, 1:50:48 PM6/6/19
to rabbitmq-users
Hello,
I am wondering what is the way to implement transactions while publishing via Spring AMQP RabbitTemplate.
Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. 

From the above sentence, it seemed to me that the only configuration needed is -
//code
rabbitTemplate.setChannelTransacted(true);


I used below sample code to publish messages to a 3 node Rabbit cluster and then I take down the Rabbit node down while messages are being published.
//code
for (int i = 0; i < 10000; ++i) {
    rabbitTemplate.send("dummy-exchange", "dummy-key", new Message((i + "").getBytes(), new MessageProperties()));
    Thread.sleep(100);
}

When the node is down, the management UI does show that the queue is down, however the code does not throw any exception indicating failure to deliver message. I know that one can also implement publisher confirms for detecting failures, but wanted to know how to implement transactions properly to achieve the same. What else am I missing? 

Thanks.

Gary Russell

unread,
Jun 6, 2019, 1:57:39 PM6/6/19
to rabbitm...@googlegroups.com
Please ask questions about spring-amqp on Stack Overflow, tagged with [spring-amqp].

That is all you need to do to use transactions.

Publishers don't publish to queues, they publish to exchanges.

You need to mirror the queue to make that work as you expect [1].

I am guessing that when the node is taken down you are simply publishing to another instance of the exchange (which has no queues).



>By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/8ea75287-140f-49f6-b557-69121f993d9d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dark Ninja

unread,
Jun 6, 2019, 2:10:15 PM6/6/19
to rabbitmq-users
Hi,
Thanks for the quick response. I can certainly ask the spring-amqp part in the stack overflow.
But before that, do transactions guarantee message delivery to the queue? I understand that messages are delivered to the queue, if I enable transactions from the below statement -

Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. 

also, what is the client/server side configuration needed to enable transactions(independent of the language or framework used)?

Thanks.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

Gary Russell

unread,
Jun 6, 2019, 2:26:18 PM6/6/19
to rabbitm...@googlegroups.com

I just ran a test and it works as I described.

When the node is down, the exchange exists on the other nodes, but has no bindings.

You can get the message to be returned:

@SpringBootApplication

public class RabitUsers1Application {


public static void main(String[] args) {

SpringApplication.run(RabitUsers1Application.class, args);

}


@Bean

public ApplicationRunner runner(RabbitTemplate template) {

template.setChannelTransacted(true);

template.setMandatory(true);

template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

System.out.println("Message returned:" + replyText + ":" + message);

});

return args -> {

while (true) {

template.convertAndSend("txtest", "txtest", "foo");

System.in.read();

}

};

}


}


Message returned:NO_ROUTE:(Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])


There are no server requirements for transactions; for Java you call channel.txSelect() and then txCommit() or txRollback() (spring takes care of all this when you set channelTransacted. I would guess something similar with other language bindings.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages