Why RabbitMQ timeout when publishing (exchange federation in used)?

125 views
Skip to first unread message

Thanh Tung Nguyen

unread,
Jan 8, 2019, 7:28:44 PM1/8/19
to rabbitmq-users

Hey guys,

For background information, we are a retailer company with more than 200 stores. We use rabbitmq federation with 200+ RabbitMQ servers (one server in each store). Each RabbitMQ server has 4 exchanges. So there are around 800 exchanges in total. The network connection in some stores is not stable and could be down for hours. Thus, rabbitMQ is used so we can retry when the link is back online.


There is one central RabbitMQ server where messages are published to. This is used as the upstream server for the other 200 servers. When there is a product change, a message is sent to the central server which will be forwarded to other rabbitmq servers and is processed locally in each store. Our application runs on .NET framework on windows servers (using Masstransit to handle RabbitMQ).


Sometimes, we get timeout errors when publishing events to rabbitMQ. This is an example:

Properties  
message_id: f68a0000-15f3-0670-085b-08d66b92a0b1
delivery_mode:  2
headers:    
x-correlation-id:   18799970-fea8-431e-8608-d02496194a0a
Content-Type:   application/vnd.masstransit+json
publishId:  1
MT-Reason:  fault
MT-Fault-Message:   Publishing message of type MyCompany.Event.Message timed out after 5000 milliseconds.
MT-Fault-Timestamp: 2018-12-27T00:32:33.6076503Z
MT-Fault-StackTrace:    at MyCompany.ProductCommand.Listener.UpsertProductCommandListener.<PublishMessage>d__10`1.MoveNext() 
at MyCompany.ProductCommand.Listener.UpsertProductCommandListener.<PublishUpdatedEvents>d__8.MoveNext() 
at MyCompany.ProductCommand.Listener.UpsertProductCommandListener.<Consume>d__7.MoveNext() 
at MassTransit.AutofacIntegration.AutofacConsumerFactory`1.<Send>d__3`1.MoveNext() 
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.<GreenPipes-IFilter<MassTransit-ConsumeContext<TMessage>>-Send>d__4.MoveNext() 
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.<GreenPipes-IFilter<MassTransit-ConsumeContext<TMessage>>-Send>d__4.MoveNext() 
at GreenPipes.Filters.TeeFilter`1.<Send>d__5.MoveNext() 
at GreenPipes.Filters.OutputPipeFilter`2.<GreenPipes-IFilter<TInput>-Send>d__6.MoveNext() 
at GreenPipes.Filters.OutputPipeFilter`2.<GreenPipes-IFilter<TInput>-Send>d__6.MoveNext() 
at MassTransit.Pipeline.Filters.DeserializeFilter.<Send>d__4.MoveNext() 
at GreenPipes.Filters.RescueFilter`2.<GreenPipes-IFilter<TContext>-Send>d__5.MoveNext()
MT-Host-MachineName:    WIN-123456
MT-Host-ProcessName:    MyCompany.ProductCommandListener.WindowsService
MT-Host-ProcessId:  848
MT-Host-Assembly:   MyCompany.ProductCommandListener.WindowsService
MT-Host-AssemblyVersion:    1.0.0.0
MT-Host-MassTransitVersion: 3.5.4.992
MT-Host-FrameworkVersion:   4.0.30319.42000
MT-Host-OperatingSystemVersion: Microsoft Windows NT 6.2.9200.0
content_type:   application/vnd.masstransit+json


Each message is around 5000 bytes. We would like to understand how the message got timeout. We found this in rabbitMQ website regarding ack behaviour:

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message. (https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed)

However, we couldn't find any information regarding publishing acknowledgement behavior in federation. So our questions are

  1. 1. In exchange federation, the broker sends publishing acknowledgement when all downstream servers receive message or when upstream server persists the message to the disk?
  2. 2. How to resolve the timeout error?

Any help would be appreciated.

Luke Bakken

unread,
Jan 8, 2019, 7:42:59 PM1/8/19
to rabbitmq-users
Hello,

Please provide a bit more information -

* What version of RabbitMQ, Erlang and what operating system is used to run it
* Do you have any custom RabbitMQ configuration?
* Are there any errors or other interesting messages in the RabbitMQ log file? You can attach a compressed archive of the file if you'd like.

As for your first question with regard to federation, RabbitMQ will send a publisher confirm when the message is routed and persisted to disk (if requested) on all nodes (if HA is in use). Federation is not taken into account with regard to publisher confirms to the original publishing application.

Thanks,
Luke

Thanh Tung Nguyen

unread,
Jan 8, 2019, 8:15:31 PM1/8/19
to rabbitmq-users
Hi Luke,
Thanks for the fast response.

To answer your questions:
- RabbitMQ 3.6.2, Erlang 18.3, windows 2008 R2 (I know it is old... However, there are some legacy systems so it takes time to migrate to newer windows version)
- For custom policy, I will need to check with my colleagues and get back to you.
- Regarding errors in the log, there are few interesting ones:
1. Heartbeats error
=ERROR REPORT==== 5-Nov-2018::21:32:53 ===
closing AMQP connection <0.28115.5584> (10.200.2.71:9788 -> 10.200.2.94:5672):
missed heartbeats from client, timeout: 10s

=ERROR REPORT==== 5-Nov-2018::21:32:55 ===
closing AMQP connection <0.29458.5584> (10.200.2.71:9908 -> 10.200.2.94:5672):
missed heartbeats from client, timeout: 10s

2. Deaths of mirrors (possibly network outage)
=INFO REPORT==== 11-Dec-2018::03:48:14 ===
Mirrored queue 'federation: MyCompany.Event.Message:IUpdatedEventMessage--System:String::MyCompany.Product.Model:Product-- -> rab...@169-SVR01.MyCompany.local' in vhost '/': Slave <rab...@AWS-RMQ01.3.16292.435> saw deaths of mirrors <rab...@AWS-RMQ02.3.16392.1467>

=INFO REPORT==== 11-Dec-2018::03:48:14 ===
Mirrored queue 'federation: MyCompany.Event.Message:ICreatedEventMessage--System:String::MyCompany.Product.Model:Product-- -> rab...@169-SVR01.MyCompany.local' in vhost '/': Slave <rab...@AWS-RMQ01.3.16176.435> saw deaths of mirrors <rab...@AWS-RMQ02.3.15955.1467>

3. Message timeout. Here are first few lines of the error. There is a total of 600 lines for this error message. Please let me know if you want the full error message for this.
=ERROR REPORT==== 17-Oct-2018::05:21:59 === ** Generic server <0.6419.4816> terminating ** Last message in was {maybe_expire,4} ** When Server state == {q, {amqqueue, {resource,<<"/">>,queue, <<"federation: MyCompany.Event.Message:IUpdatedEventMessage--System:String::MyCompany.Product.Model:Product-- -> rab...@134-SVR01.MyCompany.local">>}, true,false,none, [{<<"x-expires">>,long,18000000}, {<<"x-internal-purpose">>,longstr, <<"federation">>}], <0.6419.4816>, [<8756.15551.1467>], [], ['rabbit@AWS-RMQ02'], [{vhost,<<"/">>}, {name,<<"mirror_fed">>}, {pattern, <<"federation: (?:[^MassTransit\\:Fault])">>}, {'apply-to',<<"queues">>}, {definition,[{<<"ha-mode">>,<<"all">>}]}, {priority,0}], [{<8756.15536.1467>,<8756.15551.1467>}, {<0.6669.4816>,<0.6419.4816>}], [],live}, none,true,rabbit_mirror_queue_master, {state, {resource,<<"/">>,queue, <<"federation: MyCompany.Event.Message:IUpdatedEventMessage--System:String::MyCompany.Product.Model:Product-- -> rab...@134-SVR01.MyCompany.local">>}, <0.6669.4816>,<0.10323.4817>,rabbit_priority_queue, {passthrough,rabbit_variable_queue,
... (omitted for clarity purpose) ... ** Reason for termination ==
** {timeout_value,
       [{rabbit_mirror_queue_master,'-stop_all_slaves/2-lc$^1/1-1-',3,
            [{file,"src/rabbit_mirror_queue_master.erl"},{line,217}]},
        {rabbit_mirror_queue_master,stop_all_slaves,2,
            [{file,"src/rabbit_mirror_queue_master.erl"},{line,217}]},
        {rabbit_mirror_queue_master,delete_and_terminate,2,
            [{file,"src/rabbit_mirror_queue_master.erl"},{line,205}]},
        {rabbit_amqqueue_process,'-terminate_delete/3-fun-1-',6,
            [{file,"src/rabbit_amqqueue_process.erl"},{line,252}]},
        {rabbit_amqqueue_process,terminate_shutdown,2,
            [{file,"src/rabbit_amqqueue_process.erl"},{line,277}]},
        {gen_server2,terminate,3,[{file,"src/gen_server2.erl"},{line,1146}]},
        {proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,250}]}]}
** In 'terminate' callback with reason ==
** normal

Cheers,
Tung

Thanh Tung Nguyen

unread,
Jan 9, 2019, 12:11:58 AM1/9/19
to rabbitmq-users
Hi Luke,
For your second question, yes, we have policies to federate exchanges. Also, we have policies for HA:

NamePatternApply toDefinitionPriority
mirror_fed
federation: (?:[^MassTransit\:Fault])
queues
ha-mode:all
0
mirror_test^Jbh.(?:.*).(?:.*)queues
ha-mode:all
0


Regards,
Tung

Luke Bakken

unread,
Jan 9, 2019, 3:56:54 PM1/9/19
to rabbitmq-users
Hello,

I'm not so concerned about your Windows version. Both your Erlang and RabbitMQ versions are very out-of-date. Erlang 18 in particular has TCP issues that could explain what you are seeing. I suggest upgrading both ASAP.

There still isn't enough information to get to the bottom of the publish timeout. It would be great to know if the timeout happens while writing to the socket or in waiting for a publisher confirm, but I can't tell from the stack trace.

Luke

Thanh Tung Nguyen

unread,
Jan 9, 2019, 6:24:51 PM1/9/19
to rabbitmq-users
Thanks Luke. Yeah, we tried to upgrade to a newer version of rabbit and erlang but it broke some stuffs in our system. We will need to rewrite that part. By the way, how to figure out where the timeout occurs? Please let us know if you need any additional information.
Also, the message can be written to a dead letter queue after the message failed to be published. So maybe it timeouts when waiting for a publisher confirm.

Cheers,
Tung

Luke Bakken

unread,
Jan 10, 2019, 1:15:30 PM1/10/19
to rabbitmq-users
Hello -

Information about the timeout is MassTransit-dependent. There may be debug logging you can enable, or you could do a packet trace in hopes of catching one. I would look at the MassTransit code to see exactly what operations it is doing using the RabbitMQ C# client.

There might be a message in your RabbitMQ logs at the same time - that would be very helpful to know.

Thanks,
Luke

Thanh Tung Nguyen

unread,
Jan 13, 2019, 7:32:53 PM1/13/19
to rabbitmq-users
Thanks Luke. We tried to replicate that in an UAT environment but the error didn't occur. We will try to turn on masstransit logging and look at MassTransit code.

Cheers,
Tung
Reply all
Reply to author
Forward
0 new messages