Federation plugin creates lots of direct connections when upstream does not respond

697 views
Skip to first unread message

Olof Åkesson

unread,
Sep 12, 2018, 7:33:10 AM9/12/18
to rabbitmq-users
Hi,

We have run into an issue with a high number of "direct" connections being created by the federation plugin which causes a memory alarm and connections to be blocked.

We are in the process of upgrading from RabbitMQ 3.6.5 (Erlang 18.3) to RabbitMQ 3.7.7 (Erlang 20.3).

In our test environment we have two clusters (A and B) with five nodes in each with a Keepalived/LVS load balancer fronting each cluster. Cluster A is running 3.6.5 and cluster B is running 3.7.7.

We use federated queues and exchanges in both directions and have around 900 running federation links in each cluster.

Due to a misconfigured backup process the loadbalancer for cluster A did not respond at all for about 20 minutes. This led to the number of direct (protocol: "Direct 0-9-1") connections on cluster B to increase to the point where the memory alarm was triggered and connections were blocked. After the loadbalancer on cluster A became available again the federation links were restored but the thousands of direct connections were never removed. 


The logs on cluster B (RabbitMQ 3.7.7) are filled with error messages like this:

2018-09-10 02:20:09.949 [error] <0.13662.124> ** Generic server <0.13662.124> terminating
** Last message in was {'$gen_cast',maybe_go}
** When Server state == {not_started,{amqqueue,{resource,<<"/">>,queue,<<"some.queue">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"some.dlx">>},{<<"x-dead-letter-routing-key">>,longstr,<<"some.key">>}],<0.11453.16>,[<5406.30539.140>],[],['rabbit@host-03'],[{vhost,<<"/">>},{name,<<"default">>},{pattern,<<".*">>},{'apply-to',<<"queues">>},{definition,[{<<"federation-upstream-set">>,<<"all">>},{<<"ha-mode">>,<<"exactly">>},{<<"ha-params">>,2},{<<"ha-sync-mode">>,<<"automatic">>}]},{priority,0}],undefined,[{<5406.30547.140>,<5406.30539.140>},{<0.11460.16>,<0.11453.16>}],[rabbit_federation_queue],live,0,[],<<"/">>,#{user => <<"admin">>}},false,{upstream,[<<"amqp://federation:password@host:5672/%2f">>],<<"some.queue">>,<<"some.queue">>,1000,1,5,none,none,false,'on-confirm',none,<<"host">>,false},{upstream_params,<<"amqp://federation:password@host:5672/%2f">>,{amqp_params_network,<<"federation">>,<<"password">>,<<"/">>,"host",5672,2047,0,10,60000,none,[#Fun<amqp_uri.12.90191702>,#Fun<amqp_uri.12.90191702>],[],[]},{amqqueue,{resource,<<"/">>,queue,<<"some.queue">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"some.dlx">>},{<<"x-dead-letter-routing-key">>,longstr,<<"some.key">>}],<0.11453.16>,[<5406.30539.140>],[],['rabbit@host-03'],[{vhost,<<"/">>},{name,<<"default">>},{pattern,<<".*">>},{'apply-to',<<"queues">>},{definition,[{<<"federation-upstream-set">>,<<"all">>},{<<"ha-mode">>,<<"exactly">>},{<<"ha-params">>,2},{<<"ha-sync-mode">>,<<"automatic">>}]},{priority,0}],undefined,[{<5406.30547.140>,<5406.30539.140>},{<0.11460.16>,<0.11453.16>}],[rabbit_federation_queue],live,0,[],<<"/">>,#{user => <<"admin">>}},...}}
** Reason for termination ==
** {timeout,{gen_server,call,[<0.13683.124>,connect,60000]}}
2018-09-10 02:20:09.949 [error] <0.13662.124> CRASH REPORT Process <0.13662.124> with 0 neighbours exited with reason: {timeout,{gen_server,call,[<0.13683.124>,connect,60000]}} in gen_server2:terminate/3 line 1166
2018-09-10 02:20:09.950 [error] <0.11478.16> Supervisor {<0.11478.16>,rabbit_federation_link_sup} had child {upstream,[<<"amqp://federation:password@host:5672/%2f">>],
          <<"some.queue">>,
          <<"some.queue">>,1000,1,5,none,none,
          false,'on-confirm',none,<<"host">>,false} started with rabbit_federation_queue_link:start_link({{upstream,[<<"amqp://federation:password@host:5672/%2f">>],<<"xxxxx....">>,...},...}) at <0.13662.124> exit with reason {timeout,{gen_server,call,[<0.13683.124>,connect,60000]}} in context child_terminated
2018-09-10 02:20:09.950 [warning] <0.13677.124> Channel (<0.13677.124>): Unregistering confirm handler <0.13662.124> because it died. Reason: {timeout,{gen_server,call,[<0.13683.124>,connect,60000]}}


We can reproduce the problem using this iptable rule to just drop all traffic on port 5672:

iptables -A INPUT -i eth0 -p tcp --dport 5672 -j DROP

The problem seems somewhat similar to https://github.com/rabbitmq/rabbitmq-federation/pull/77 which is about direct connections that are never removed.


Grateful for any help with this problem and if more information is needed then I will try to provide it.

Thanks,
Olof

Michael Klishin

unread,
Sep 12, 2018, 3:20:44 PM9/12/18
to rabbitm...@googlegroups.com
Besides the iptables rule, what are the steps to reproduce?

we won't get to it before 3.7.8 ships and strongly prefer not to guess.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Selim Tuvi

unread,
Sep 12, 2018, 3:51:19 PM9/12/18
to rabbitmq-users
We also have this issue on 3.7.7. During a network outage, we started getting error logs like the following and the number of federated link connections jumped from around 500 to 30K.

** When Server state == {not_started,{amqqueue,{resource,<<"/">>,queue,<<"mercury.ilm-sing.signiant.delete_manifest">>},true,false,none,[],<0.6888.0>,[],[],[],[{vhost,<<"/">>},{name,<<"federated-mercury-queues">>},{pattern,<<"^mercury\\..*">>},{'apply-to',<<"queues">>},{definition,[{<<"federation-upstream-set">>,<<"all">>}]},{priority,0}],undefined,[],[rabbit_federation_queue],live,1,[],<<"/">>,#{user => <<"iris">>}},false,{upstream,[<<"amqp://iris:siri@pe-mercuryservices-sf-01v">>],<<"mercury.ilm-sing.signiant.delete_manifest">>,<<"mercury.ilm-sing.signiant.delete_manifest">>,1000,1,5,none,none,false,'on-confirm',none,<<"pe-mercuryservices-sf-01v">>,false},{upstream_params,<<"amqp://iris:siri@pe-mercuryservices-sf-01v">>,{amqp_params_network,<<"iris">>,<<"siri">>,<<"/">>,"pe-mercuryservices-sf-01v",undefined,2047,0,10,60000,none,[#Fun<amqp_uri.12.90191702>,#Fun<amqp_uri.12.90191702>],[],[]},{amqqueue,{resource,<<"/">>,queue,<<"mercury.ilm-sing.signiant.delete_manifest">>},true,false,none,[],<0.6888.0>,[],[],[],[{vhost,<<"/">>},{name,<<"federated-mercury-queues">>},{pattern,<<"^mercury\\..*">>},{'apply-to',<<"queues">>},{definition,[{<<"federation-upstream-set">>,<<"all">>}]},{priority,0}],undefined,[],[rabbit_federation_queue],live,1,[],<<"/">>,#{user => <<"iris">>}},<<"amqp://pe-mercuryservices-sf-01v">>,[{<<"uri">>,longstr,<<"amqp://pe-mercuryservices-sf-01v">>},{<<"queue">>,longstr,<<"mercury.ilm-sing.signiant.delete_manifest">>}]}}
** Reason for termination == 
** {timeout,{gen_server,call,[<0.25302.475>,connect,60000]}}
2018-09-12 08:09:31.705 [error] <0.25336.475> CRASH REPORT Process <0.25336.475> with 0 neighbours exited with reason: {timeout,{gen_server,call,[<0.25302.475>,connect,60000]}} in gen_server2:terminate/3 line 1166
2018-09-12 08:09:31.705 [warning] <0.25313.475> Channel (<0.25313.475>): Unregistering confirm handler <0.25255.475> because it died. Reason: {timeout,{gen_server,call,[<0.25242.475>,connect,60000]}}
2018-09-12 08:09:31.705 [warning] <0.25388.475> Channel (<0.25388.475>): Unregistering confirm handler <0.25304.475> because it died. Reason: {timeout,{gen_server,call,[<0.25358.475>,connect,60000]}}
2018-09-12 08:09:31.705 [warning] <0.25214.475> Channel (<0.25214.475>): Unregistering confirm handler <0.25336.475> because it died. Reason: {timeout,{gen_server,call,[<0.25302.475>,connect,60000]}}
2018-09-12 08:09:31.705 [error] <0.9043.0> Supervisor {<0.9043.0>,rabbit_federation_link_sup} had child {upstream,[<<"amqp://iris:siri@pe-mercuryservices-sf-01v">>],
          <<"mercury.ilm-sing.signiant.delete_manifest">>,
          <<"mercury.ilm-sing.signiant.delete_manifest">>,1000,1,5,none,none,
          false,'on-confirm',none,<<"pe-mercuryservices-sf-01v">>,false} started with rabbit_federation_queue_link:start_link({{upstream,[<<"amqp://iris:siri@pe-mercuryservices-sf-01v">>],<<"mercury.ilm-sing.signiant.delete...">>,...},...}) at <0.25336.475> exit with reason {timeout,{gen_server,call,[<0.25302.475>,connect,60000]}} in context child_terminated
2018-09-12 08:10:05.157 [error] <0.25444.475> ** Generic server <0.25444.475> terminating
** Last message in was {'$gen_cast',maybe_go}
** When Server state == {not_started,{{upstream,[<<"amqp://iris:siri@pe-mercuryservices-sing-01v">>],<<"celery.pidbox">>,<<"celery.pidbox">>,1000,1,5,none,none,false,'on-confirm',none,<<"pe-mercuryservices-sing-01v">>,false},{upstream_params,<<"amqp://iris:siri@pe-mercuryservices-sing-01v">>,{amqp_params_network,<<"iris">>,<<"siri">>,<<"/">>,"pe-mercuryservices-sing-01v",undefined,2047,0,10,60000,none,[#Fun<amqp_uri.12.90191702>,#Fun<amqp_uri.12.90191702>],[],[]},{exchange,{resource,<<"/">>,exchange,<<"celery.pidbox">>},fanout,false,false,false,[],[{federation,[{{<<"pe-mercuryservices-sf-01v">>,<<"celery.pidbox">>},<<"A">>},{{<<"pe-mercuryservices-sing-01v">>,<<"celery.pidbox">>},<<"A">>},{{<<"pe-mercuryservices-van-01v">>,<<"celery.pidbox">>},<<"A">>}]}],[{vhost,<<"/">>},{name,<<"federated-celery-exchanges">>},{pattern,<<"^(reply\\.)?celery.*">>},{'apply-to',<<"exchanges">>},{definition,[{<<"federation-upstream-set">>,<<"all">>}]},{priority,0}],undefined,{[],[rabbit_federation_exchange]},#{user => <<"iris">>}},<<"amqp://pe-mercuryservices-sing-01v">>,[{<<"uri">>,longstr,<<"amqp://pe-mercuryservices-sing-01v">>},{<<"exchange">>,longstr,<<"celery.pidbox">>}]},{resource,<<"/">>,exchange,<<"celery.pidbox">>}}}
** Reason for termination == 
** {timeout,{gen_server,call,[<0.25383.475>,connect,60000]}}
2018-09-12 08:10:05.157 [error] <0.25444.475> CRASH REPORT Process <0.25444.475> with 0 neighbours exited with reason: {timeout,{gen_server,call,[<0.25383.475>,connect,60000]}} in gen_server2:terminate/3 line 1166
2018-09-12 08:10:05.158 [error] <0.9366.0> Supervisor {<0.9366.0>,rabbit_federation_link_sup} had child {upstream,[<<"amqp://iris:siri@pe-mercuryservices-sing-01v">>],
          <<"celery.pidbox">>,<<"celery.pidbox">>,1000,1,5,none,none,false,
          'on-confirm',none,<<"pe-mercuryservices-sing-01v">>,false} started with rabbit_federation_exchange_link:start_link({{upstream,[<<"amqp://iris:siri@pe-mercuryservices-sing-01v">>],<<"celery.pidbox">>,<<"celery.pid...">>,...},...}) at <0.25444.475> exit with reason {timeout,{gen_server,call,[<0.25383.475>,connect,60000]}} in context child_terminated
2018-09-12 08:10:05.158 [warning] <0.25266.475> Channel (<0.25266.475>): Unregistering confirm handler <0.25444.475> because it died. Reason: {timeout,{gen_server,call,[<0.25383.475>,connect,60000]}}
2018-09-12 08:10:05.209 [error] <0.25325.475> ** Generic server <0.25325.475> terminating
** Last message in was {'$gen_cast',maybe_go}


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
rabbitmq_federated_conn.png

Michael Klishin

unread,
Sep 12, 2018, 3:53:43 PM9/12/18
to rabbitm...@googlegroups.com
Any more details on this? What node are these logs from, upstream or downstream?


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Selim Tuvi

unread,
Sep 12, 2018, 3:59:31 PM9/12/18
to rabbitmq-users
The logs are from the downstream node. We have three upstream nodes connected to it which are also on 3.7.7. Here is how we establish the federation link:

def apply_federation(nodes, pattern, user, password):
auth = requests.auth.HTTPBasicAuth(user, password)
for node in nodes:
url_base = 'http://{0}:15672'.format(node)
wait_for_host(url_base)
for other_node in nodes:
if node == other_node:
continue

url = '{0}/api/parameters/federation-upstream/%2f/{1}'.format(
url_base,
other_node)

data = {"value": {"uri": "amqp://{0}:{1}@{2}".format(
user, password, other_node)}}

resp = requests.put(url,
data=json.dumps(data),
auth=auth)
resp.raise_for_status()

url = '{0}/api/policies/%2f/federated-mercury-queues'.format(
url_base)
data = {"pattern": pattern,
"definition": {"federation-upstream-set": "all"},
"apply-to": "queues"}

resp = requests.put(url,
data=json.dumps(data),
auth=auth)
resp.raise_for_status()


url = '{0}/api/policies/%2f/federated-celery-exchanges'.format(
url_base)
data = {"pattern": '^(reply\.)?celery.*',
"definition": {"federation-upstream-set": "all"},
"apply-to": "exchanges"}

resp = requests.put(url,
data=json.dumps(data),
auth=auth)
resp.raise_for_status()


-Selim

Michael Klishin

unread,
Sep 12, 2018, 6:34:04 PM9/12/18
to rabbitm...@googlegroups.com
I tried reproducing using two 3.7.7 nodes, A and B, running on the same
host and connecting over a Toxiproxy. Now in more details.

1. Start a node with mostly defaults and federation plugin enabled
2. Start a node with mostly defaults and federation plugin enabled but a different set of ports:

RABBITMQ_ALLOW_INPUT=1 RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME="hare@hostname" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" rabbitmq-server

3. Start a Toxiproxy, add a listener on port 5683 and upstream

toxiproxy-cli c federation-5673 --listen 127.0.0.1:5683 --upstream 127.0.0.1:5673

4. Add a timeout "toxic" (rule) to Toxiproxy:

toxiproxy-cli t add federation-5673 -t timeout -n timeout -a timeout=6000

5. Add a federation upstream to node A that connects to port 5673 (which is not "toxified" — intentionally)
6. Add a policy on node A that matches exchanges named ^federation\.
7. Declare an exchange named federation.x1
8. Observe a federation link
9. Close the connection on node B
10. Observe connections on both nodes disappear and recover
11. Delete the policy and the upstream on node 1
12. Add a federation upstream to node A that connects to port 5683 (which is not "toxified" — intentionally)
13. Add a policy on node A that matches exchanges named ^federation\.
14. Declare an exchange named federation.x1
15. Observe a federation link that starts, fails, disappears, starts against and so on

TL;DR: I cannot reproduce with Toxiproxy-induced timeouts. Both nodes are 3.7.7.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 12, 2018, 7:08:37 PM9/12/18
to rabbitmq-users
In Olof's log snippet it's a channel operation that fails with a timeout, not a connection attempt, although
they can follow in rapid succession.

So I have at least one other scenario to try to replicate: toxicity has to be added *after* a successful connection
and before a channel operation is triggered one way or another.

I should get to that in the next few days.

On Thursday, September 13, 2018 at 12:34:04 AM UTC+2, Michael Klishin wrote:
I tried reproducing using two 3.7.7 nodes, A and B, running on the same
host and connecting over a Toxiproxy. Now in more details.

1. Start a node with mostly defaults and federation plugin enabled
2. Start a node with mostly defaults and federation plugin enabled but a different set of ports:

RABBITMQ_ALLOW_INPUT=1 RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME="hare@hostname" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" rabbitmq-server

3. Start a Toxiproxy, add a listener on port 5683 and upstream

toxiproxy-cli c federation-5673 --listen 127.0.0.1:5683 --upstream 127.0.0.1:5673

4. Add a timeout "toxic" (rule) to Toxiproxy:

toxiproxy-cli t add federation-5673 -t timeout -n timeout -a timeout=6000

5. Add a federation upstream to node A that connects to port 5673 (which is not "toxified" — intentionally)
6. Add a policy on node A that matches exchanges named ^federation\.
7. Declare an exchange named federation.x1
8. Observe a federation link
9. Close the connection on node B
10. Observe connections on both nodes disappear and recover
11. Delete the policy and the upstream on node 1
12. Add a federation upstream to node A that connects to port 5683 (which is not "toxified" — intentionally)
13. Add a policy on node A that matches exchanges named ^federation\.
14. Declare an exchange named federation.x1
15. Observe a federation link that starts, fails, disappears, starts against and so on

TL;DR: I cannot reproduce with Toxiproxy-induced timeouts. Both nodes are 3.7.7.

--

Selim Tuvi

unread,
Sep 12, 2018, 7:26:49 PM9/12/18
to rabbitmq-users
Thanks for trying to repro the issue. FWIW, our setup and network topology may be more complicated than your test environment. The rabbitmq servers are running as Docker containers. The downstream one was in London, while the other upstream ones were in Vancouver, Singapore and San Francsico. The outage was between the London VPN and the other locations and the nature of the outage was unknown.

Most of the lingering connections seems to have been created around the time when the outage happened and have not been closed.

I'll try to reproduce the issue in our QA environment.

Thanks
-Selim

Michael Klishin

unread,
Sep 12, 2018, 7:44:37 PM9/12/18
to rabbitm...@googlegroups.com
Step 12 in the post above has a mistake: the connection to port 5683, of course, *was* "toxified".

Toxiproxy allows for several different "toxic" types to be injected, including after a link is up and programmatically,
so there's a few different scenarios that can be tried with it.
What was addressed in [1] was trivial to reproduce, however, so the root cause here is almost certainly different.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Olof Åkesson

unread,
Sep 13, 2018, 4:30:39 AM9/13/18
to rabbitm...@googlegroups.com
Thanks for taking the time to investigate. I'll try to reproduce it in a simpler way that does not involve our test environment. 



You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/Iy1ts_WzOYc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.

Olof Åkesson

unread,
Sep 14, 2018, 10:35:30 AM9/14/18
to rabbitmq-users
Hi again,

I was able to reproduce the problem locally I believe using two docker containers with RabbitMQ 3.7.7.

The setup scripts are available here:

Needed tools:
docker-compose
pumba (network testing tool) https://github.com/alexei-led/pumba


1. "docker-compose up" to setup to RabbitMQ 3.7.7 instances (rabbit1 and rabbit2).

2. "./setup.sh" to configure federation between them and setup one federated exchange and one federated queue on both.

3. Add a delay on all network traffic for one of the instances using pumba:
pumba netem --tc-image gaiadocker/iproute2  --duration 5m delay --time 60000 <ONE OF THE RUNNING IMAGES>

4. There will be a lot of errors in the log for the other Rabbit instance and the federation links will be down and it will
attempt to reconnect. After a while there are appears extra connections on the Rabbit instance that does not have the delay:
<rab...@rabbit1.1.1422.0> Federation link (upstream: cluster2, policy: federation_test) none Direct 0-9-1

5. When pumba is finished the federation links are restored but the extra direct connections do not disappear. Running the same 
delay command again will cause the number to increase.

Hope this can be of some help,

Olof

Michael Klishin

unread,
Sep 14, 2018, 10:58:56 AM9/14/18
to rabbitm...@googlegroups.com
Thanks, I will try to reproduce using this setup.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages