on-publish, on-confirm ack mode does not seem to work in federation

760 views
Skip to first unread message

Naina

unread,
Feb 12, 2015, 6:59:40 PM2/12/15
to rabbitm...@googlegroups.com
I have 3 brokers running locally on ports 5672, 5673 and 5674.
5673 is an exchange-federated downstream broker while 5672 and 5674 are upstream brokers.
Messages are published only to one of the upstream brokers at a time (whichever is up) while listeners are tied only to the downstream broker.

ack-mode is set as on-publish while setting the upstreams. However, when a message is sent to the upstream, then consumed successfully by downstream, it still remains in READY state in the upstream queue. I expect it to disappear from upstream queue once it is published to downstream. 
Why does this happen? Am I missing any setting?

I also tried setting the ack mode to on-confirm but it had the same effect - message remained in READY state in upstream queue even though it was consumed by downstream.

I am not sending manual acks from my listeners, as Spring container takes care of acks automatically.


Federation setup:

localhost:rabbitmq_server-3.3.1 $ sbin/rabbitmqctl set_parameter federation-upstream trial1-upstream '{"uri":"amqp://localhost:5672","expires":3600000,"ack-mode":"on-publish"}'
Setting runtime parameter "trial1-upstream" for component "federation-upstream" to "{\"uri\":\"amqp://localhost:5672\",\"expires\":3600000,\"ack-mode\":\"on-publish\"}" ...
...done.
localhost
:rabbitmq_server-3.3.1 $ sbin/rabbitmqctl set_parameter federation-upstream trial2-upstream '{"uri":"amqp://localhost:5674","expires":3600000,"ack-mode":"on-publish"}'
Setting runtime parameter "trial2-upstream" for component "federation-upstream" to "{\"uri\":\"amqp://localhost:5674\",\"expires\":3600000,\"ack-mode\":\"on-publish\"}" ...
...done.
localhost
:rabbitmq_server-3.3.1 $ sbin/rabbitmqctl set_parameter federation-upstream-set trial-set '[{"upstream":"trial1-upstream"},{"upstream":"trial2-upstream"}]'
Setting runtime parameter "trial-set" for component "federation-upstream-set" to "[{\"upstream\":\"trial1-upstream\"},{\"upstream\":\"trial2-upstream\"}]" ...
...done.
localhost
:rabbitmq_server-3.3.1 $ sbin/rabbitmqctl set_policy --apply-to exchanges federate-me2 "^subscription_exchange" '{"federation-upstream-set":"trial-set"}'
Setting policy "federate-me2" for pattern "^subscription_exchange" to "{\"federation-upstream-set\":\"trial-set\"}" with priority "0" ...
...done.
localhost
:rabbitmq_server-3.3.1 $


Spring AMQP XML Configuration

  <rabbit:connection-factory id="connectionFactory"
                               
addresses="localhost:5672,localhost:5674"
                               
username="${queue.username}"
                               
password="${queue.password}"
                               
channel-cache-size="${queue.channel.cache.size}"/>


 
<rabbit:connection-factory id="consumerConnFactory"
                               
addresses="localhost:5673"
                               
username="${queue.username}"
                               
password="${queue.password}"
                               
channel-cache-size="${queue.channel.cache.size}"/>


 
<!-- template used for sending message on to queue -->
 
<rabbit:template id="genericTemplateWithRetry" connection-factory="connectionFactory" exchange="subscription_exchange" retry-template="retryTemplate"/>


 
<rabbit:direct-exchange name="subscription_exchange">
       
<rabbit:bindings>
           
<rabbit:binding queue="subscription_renewal_queries" key="v1-subscription-renewal-query"/>
       
</rabbit:bindings>
 
</rabbit:direct-exchange>


 
<rabbit:listener-container connection-factory="consumerConnFactory" message-converter="jsonMessageConverter" requeue-rejected="false">
       
<rabbit:listener ref="subscriptionRenewalQueryListener" method="listen" queue-names="subscription_renewal_queries"/>
 
</rabbit:listener-container>

Michael Klishin

unread,
Feb 13, 2015, 1:16:21 AM2/13/15
to Naina, rabbitm...@googlegroups.com
You say you use exchange federation. It does not affect messages in upstream queues, only replicates a flow of messages going through a particular exchange. However messages in upstreams are routed is not of exchange federation's business.

MK
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Naina

unread,
Feb 13, 2015, 3:00:29 PM2/13/15
to rabbitm...@googlegroups.com, shalmali...@gmail.com
It says here https://www.rabbitmq.com/federation-reference.html
under "Applying to federated exchanges and federated queues"
ack-mode: If set to on-publish, messages are acknowledged to the upstream broker after they have been published downstream. 

You mentioned that it does not affect messages in upstream queues, though if messages are acknowledged to the upstream broker like the documentation says, it will affect the message state in the upstream queue right?

Michael Klishin

unread,
Feb 13, 2015, 3:11:42 PM2/13/15
to Naina, rabbitm...@googlegroups.com
Exchange federation uses a separate queue to consume messages and publish them downstream. This in no way affects queues that your apps declare and use.

MK

Gunjan Thapliyal

unread,
Apr 17, 2018, 1:03:04 AM4/17/18
to rabbitmq-users

I wish to not lose queue data in the upstream exchange/node till it is consumed by a consumer from the downstream queue. 

Since federation plugin is removing it from the implicit queue it uses, i lose data if the downstream server goes down along with the data before consuming and processing it. Based on the understanding and my observations, none of the mode helps. Is there a way to solve the case with the federation plugin itself? This is to protect myself from data loss.
A thought is to have an ack-mode 'on-consumed'.

Thanks
Gunjan

Michael Klishin

unread,
Apr 17, 2018, 10:53:21 AM4/17/18
to rabbitm...@googlegroups.com
That is not correct.

Federation links can and do use consumer acknowledgements and publisher confirms [1]
by default, see "ack-mode" in the reference [2].

Which means that when a downstream node goes down, all outstanding unconfirmed deliveries it had consumed
will be automatically requeued as explained in [1].


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Gunjan Thapliyal

unread,
Apr 17, 2018, 12:00:03 PM4/17/18
to rabbitmq-users
Thanks Michael for answering this.

I will test this again. I will bring it to your notice once again that I was using federated exchanges (and not federated queues ) when I saw data lost with the downstream server going down.

Thanks
Gunjan

Michael Klishin

unread,
Apr 17, 2018, 12:23:49 PM4/17/18
to rabbitm...@googlegroups.com
Both exchange and queue federation can use the same acknowledgement modes. Since both also
use actual queues under the hood (even if they are carefully hidden from other apps in some cases),
it is possible that a node restart can delete them, while upstream will delete all *acknowledged* messages.

Exchange federation replicates a stream of messages, and shutting down
one "replica" (for the lack of a better word) cannot affect the original stream.
My only guess is that if your upstream node doesn't have any queues bound to the exchange,
those messages won't be routed anywhere. That would work the same way without federation in place.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gunjan Thapliyal

unread,
Apr 17, 2018, 1:32:48 PM4/17/18
to rabbitmq-users
Yes , your guess is correct.

To elaborate my topology, I have two physical nodes
Node 1 (upstream server) - publisher publishing on this node
Node 2 (downstream server) 

Node 1
Exchange - E1
Queue - No (explicit) queue bound to the exchange. But I saw a queue created by the federation plugin

Node 2
Exchange - E1
Queue - Q1 bound to E1
Consumer - C1 on Q1

Now , messages published on Node 1 - E1 routed to Node 2 - Q1 and consumed
When I stress-published thousands of messages with a slow consumer, messages were moved to Q1 (and removed from N1 'federation' queue). At this point of time, node 2 went down. There wasn't data in N1 that I could use for recovery.

I am avoiding explicit queue on N1 bound to E1, because this queue continuously grows. Consumer on N2 deleted data from Q1 and not from the upstream server queue. 


Is this the possibility you are referring to? Any suggestions to work with it?

Thanks
Gunjan

Michael Klishin

unread,
Apr 17, 2018, 1:37:22 PM4/17/18
to rabbitm...@googlegroups.com
You are sending mixed messages.

First you mentioned that you used *exchange* federation. Now you say it was *queue* federation.

Queue federation does not replicate anything. It simply moves messages to remote nodes
when there are no local consumers. The ack-mode setting still applies so messages "in flight" but for those that were
moved, a node shutdown is disruptive (or catastrophic, depending on queue and message durability properties) with a single node.

It is possible to federate queues that are mirrored within an upstream (and/or downstream) since federation can link clusters.
However at this point there seems to be a lot of confusion as to what federation actually does with exchanges and queues.
I'd highly recommend dealing with that first before mirroring is introduced.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gunjan Thapliyal

unread,
Apr 17, 2018, 1:52:41 PM4/17/18
to rabbitmq-users
Sorry for the confusion.

I am using federated exchanges. By federation queue, I was referring to the queue that was created by the federation plugin(not created by me). 

I can see it in the management console Queue tab with the name 
federation: E1->rabbit@Node2

Thanks
Gunjan

Michael Klishin

unread,
Apr 17, 2018, 2:16:20 PM4/17/18
to rabbitm...@googlegroups.com
Messages published on Node 1 are *not* routed to Node 2. They are *replicated*
to Node 2. If there is no Node 2 to replicate to, they are routed as usual on Node 1.

Therefore if there's nowhere to route them (e.g. no queues are bound to the exchange(s) in question), they are
voided or returned to the publisher as usual (topic with many previous discussions on this list).

Therefore the question is: how should routing work for those exchanges "on Node 1". If you want publishes to a node
to be accessible to consumers on other nodes you either want clustering (in a LAN) or queue federation (the closest thing there is
over WAN but it's not replication).

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gunjan Thapliyal

unread,
Apr 18, 2018, 1:03:07 AM4/18/18
to rabbitmq-users
Those are clear to me and heave used them as a base for my topology. 
My concern is losing data from Q1 on Node2 (not dequeued, processed or acknowledged)if the node goes down.
I looked for the data in Node 1. It was missing from Node1 in the queue created by the federation plugin (federation:exch->rabbit@Node2). Once again, I am using federated exchanges (not federated queues)

Based on your helpful explanation here and the documentation, it appears that federated exchanges do not provide recovery in case of downstream server failure. 
May be I will have to also create a queue and  route the data to it on Node1 for failure recovery. It brings on a need to clean this queue (probably ttl can help with best-effort simple solution). 

Thanks for your help and showing patience.

Thanks
Gunjan

Michael Klishin

unread,
Apr 18, 2018, 12:23:19 PM4/18/18
to rabbitm...@googlegroups.com
Federation-created queues are transient by design: they are transient in nature. They are nothing more than
a tool to a message passing end. With exchange federation those queues get *copies* of messages that flow through an exchange.

With queue federation there is no replication and that should be taken into account when considering queue federation. The window of time
in which a message was moved by federation from upstream but not yet enqueued locally is pretty narrow even then
and not really different from other scenarios where a piece of data is only held in RAM and things go down right before it is persisted in any way.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages