MassTransit RabbitMQ connection lifecycle?

1,915 views
Skip to first unread message

Andrei Volkov

unread,
Nov 26, 2012, 3:34:53 PM11/26/12
to masstrans...@googlegroups.com
Can someone please explain how MT manages its RabbitMQ connections. Or if more specific questions are easier to answer, please answer the 16 questions below. I pinky-promise this is universally applicable and free of my business specifics.

I have a Windows Service [CONSUMER] that consumes messages, and responds, but never publishes. I also have a Web App [PUBLISHER] that publishes messages, but never consumes. They sit on different boxes, use unique queue names and unique RabbitMQ credentials.

The CONSUMER is configured like this (using Autofac):

container.Register(c => 
ServiceBusFactory.New(bus => {
bus.UseLog4Net();
bus.UseRabbitMq();
bus.ReceiveFrom(QueueUrl);
bus.Subscribe(x => x.LoadFrom(c.Resolve<ILifetimeScope>()));
})
).As<IServiceBus>().SingleInstance();

Q1. Is there anything obviously wrong or missing about this configuration?
Q2. Assuming my network connection is unreliable, how do I configure transport timeouts?
Q3. Assuming my consumers do IConsumeContext<>.Respond(), how do I configure the response delivery retries?
Q4. Are there any other settings related to transport disruption resilience that I can tweak?

my consumers implement Consumes<XYZ>.Context and are registered like this:

container.RegisterAssemblyTypes(eventHandlerAssembly)
.Where(t => t.Implements<IConsumer>())
.AsSelf()
.InstancePerLifetimeScope();

On the service startup I resolve the single instance of IServiceBus from the container and off it goes. 

Q5. When I look at RabbitMQ web console, how many connections should I expect to see? Right now I see about 20 of which only 3 are active and the rest are idle for several days now... I thought I should see exactly one per the bus instance.
Q6. Does MT "fork" a new connection every time it instantiates a Consumer to process an incoming message? When is it closed then?
Q7. Does MT open another new connection every time it does IConsumeContext<>.Respond()? When does it close them then?
Q8. If I do a IConsumeContext<>.Respond and then throw an exception, what would happen? Can I count on the connection to be reliably closed?
Q9. I see that many of the connections from the CONSUMER app use the credentials of the PUBLISHER app! Is this because the IConsumeContext<>.Respond() relies on the Reply To URI supplied in the request? I did not expect one app to use credentials supplied by another app.
Q10. Out of the 3 active connections, 2 use the PUBLISHER's credentials and seem to be sending bytes. Does this mean I have 2 runaway consumers? Does MT provide a way to detect and kill these?

Going over to the PUBLISHER app now. It is an ASP.NET app that is configured (using Autofac) like so:

            builder.Register(container =>
                ServiceBusFactory.New(bus =>
                {
                    bus.UseLog4Net();
                    bus.UseRabbitMq();
                    bus.ReceiveFrom(url);
                })
            ).As<IServiceBus>().SingleInstance()
            .OnRelease(bus => {
                bus.Dispose();
                DeleteQueueAndExchange(url);
            });

Since the PUBLISHER app runs on a web farm, the queue name in the url is generated on the fly from AppName + MachineName + PID + AppDomainId. The queue and exchange are deleted on application shutdown using RabbitMQ API directly.

My typical usage pattern is to inject the IServiceBus into a controller, and do a Publish, or sometimes PublishRequest (I know I know) to send the messages to the CONSUMER app.

Q11: Is there anything obviously wrong or missing about this configuration?
Q12: I see some people on the user group use EndpointCacheFactory. Why would I want to use one?
Q13: Assuming my network connection is unreliable, how do I configure Publish timeouts and retry attempts?
Q14: When does MT open a new connection to RabbitMQ?
Q15: How many active connections to RabbitMQ server should I expect to see? One for every pending PublishRequest plus a shared one to do the Publish? I see 10 open connections of which 8 are idle and 2 are active. Does this mean I have two pages waiting on their PublishRequest?
Q16: If I do a PublishRequest and the CONSUMER app throws an exception, will the connection be closed reliably?

Thanks!

Henrik Feldt

unread,
Nov 26, 2012, 4:50:08 PM11/26/12
to masstrans...@googlegroups.com
On Mon, Nov 26, 2012 at 9:34 PM, Andrei Volkov <zvo...@gmail.com> wrote:
Can someone please explain how MT manages its RabbitMQ connections. Or if more specific questions are easier to answer, please answer the 16 questions below. I pinky-promise this is universally applicable and free of my business specifics.

I have a Windows Service [CONSUMER] that consumes messages, and responds, but never publishes. I also have a Web App [PUBLISHER] that publishes messages, but never consumes. They sit on different boxes, use unique queue names and unique RabbitMQ credentials.

The CONSUMER is configured like this (using Autofac):

container.Register(c => 
ServiceBusFactory.New(bus => {
bus.UseLog4Net();
bus.UseRabbitMq();
bus.ReceiveFrom(QueueUrl);
bus.Subscribe(x => x.LoadFrom(c.Resolve<ILifetimeScope>()));
})
).As<IServiceBus>().SingleInstance();

Q1. Is there anything obviously wrong or missing about this configuration?

Not that I can see.
 
Q2. Assuming my network connection is unreliable, how do I configure transport timeouts?
Q3. Assuming my consumers do IConsumeContext<>.Respond(), how do I configure the response delivery retries?
Q4. Are there any other settings related to transport disruption resilience that I can tweak?

my consumers implement Consumes<XYZ>.Context and are registered like this:

container.RegisterAssemblyTypes(eventHandlerAssembly)
.Where(t => t.Implements<IConsumer>())
.AsSelf()
.InstancePerLifetimeScope();

On the service startup I resolve the single instance of IServiceBus from the container and off it goes. 

Q5. When I look at RabbitMQ web console, how many connections should I expect to see? Right now I see about 20 of which only 3 are active and the rest are idle for several days now... I thought I should see exactly one per the bus instance.

If the bus is shut down properly it should close its connection. If you crash it, it doesn't, but if you enable RMQ heartbeats the broker will purge the list after a while.
 
Q6. Does MT "fork" a new connection every time it instantiates a Consumer to process an incoming message? When is it closed then?
No.
Q7. Does MT open another new connection every time it does IConsumeContext<>.Respond()? When does it close them then?
No.
Q8. If I do a IConsumeContext<>.Respond and then throw an exception, what would happen? Can I count on the connection to be reliably closed?

The connection is per-bus, and throwing exceptions won't close it. I belive Respond would happen before your exception stops the consume handler.
 
Q9. I see that many of the connections from the CONSUMER app use the credentials of the PUBLISHER app! Is this because the IConsumeContext<>.Respond() relies on the Reply To URI supplied in the request? I did not expect one app to use credentials supplied by another app.

Interesting point, Chris? It's because the Respond() looks up the endpoint and that endpoint is denoted by the credentials which is at the moment put in the URI of the consuming queue.
 
Q10. Out of the 3 active connections, 2 use the PUBLISHER's credentials and seem to be sending bytes. Does this mean I have 2 runaway consumers? Does MT provide a way to detect and kill these?

You'll have to look at how many active queues you got with messages being throughput.
 

Going over to the PUBLISHER app now. It is an ASP.NET app that is configured (using Autofac) like so:

            builder.Register(container =>
                ServiceBusFactory.New(bus =>
                {
                    bus.UseLog4Net();
                    bus.UseRabbitMq();
                    bus.ReceiveFrom(url);
                })
            ).As<IServiceBus>().SingleInstance()
            .OnRelease(bus => {
                bus.Dispose();
                DeleteQueueAndExchange(url);
            });

Since the PUBLISHER app runs on a web farm, the queue name in the url is generated on the fly from AppName + MachineName + PID + AppDomainId. The queue and exchange are deleted on application shutdown using RabbitMQ API directly.

My typical usage pattern is to inject the IServiceBus into a controller, and do a Publish, or sometimes PublishRequest (I know I know) to send the messages to the CONSUMER app.

Q11: Is there anything obviously wrong or missing about this configuration?

No.
 
Q12: I see some people on the user group use EndpointCacheFactory. Why would I want to use one?

For static routing.
 
Q13: Assuming my network connection is unreliable, how do I configure Publish timeouts and retry attempts?

You need to plug into the RetryTracker, but there's no way to do that out of the box afaik. Timeouts are handled on a connection level so that's nothing you manage when publishing.
 
Q14: When does MT open a new connection to RabbitMQ?

When the bus is created/started.
 
Q15: How many active connections to RabbitMQ server should I expect to see? One for every pending PublishRequest plus a shared one to do the Publish? I see 10 open connections of which 8 are idle and 2 are active. Does this mean I have two pages waiting on their PublishRequest?
Q16: If I do a PublishRequest and the CONSUMER app throws an exception, will the connection be closed reliably?

No, the connection is not closed per design.
 

Thanks!


Cheers.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msg/masstransit-discuss/-/AMYC_zYbXFcJ.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrei Volkov

unread,
Nov 26, 2012, 8:24:49 PM11/26/12
to masstrans...@googlegroups.com
Thanks Henrik, to summarize, MT opens one connection at the bus startup for the queue, and then one connection for each new unique exchange name it encounters (e.g. a new message to be published or a new Response endpoint). This is super-helpful for me to know. Now I see that generating random queue names is very bad in conjunction with PublishRequest/Respond pattern, since that will keep those random exchanges connected to by the Consumer side, essentially leaking connections.

How do I shutdown the bus properly? Just call Dispose on it?

Also, if I can't configure anything like transport timeouts or publish retries, what are those numerous Add Endpoint Configurator, Set Transport Factory and Specify Connection Builder extension methods are for?
Reply all
Reply to author
Forward
0 new messages