Setting prefetch count per consumer

1,129 views
Skip to first unread message

Chris Robison

unread,
Aug 11, 2016, 1:00:47 PM8/11/16
to masstransit-discuss
You have documentation here: http://docs.masstransit-project.com/en/latest/overview/underthehood.html?highlight=prefetch, that shows a possible extension method on consumer configuration to set a prefetch count but I'm not seeing it. I'm defining by receive endpoints as follows: 

cfg.ReceiveEndpoint(MessagingConstants.XactNetUploadQueue,
                        endPntCfg =>
                        {
                            endPntCfg.UseConcurrencyLimit(2);
                            endPntCfg.Consumer<IConsumer<IUploadAssignment>>(context);
                        });

I'm using Autofac, btw. Other than the user concurrency limit, I'm not seeing an available path to set the prefetch. The host prefetch count doesn't seem to do much other than set the prefetch on a single consumer as shown below:

2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/complete_callbacks?prefetch=8 - amq.ctag-QHETW_dGAuV5zC14YAXRMA
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/reports?prefetch=8 - amq.ctag-MPi722ESOBp0Io6qHIctDQ
2016-08-11 10:41:15.2897 | DEBUG | Queue: bus-CROBISON-WINX1-xactprmservices-dnaoyybjkoyy346rbdjhrb1xdd (auto-delete)
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/delete_instance?prefetch=8 - amq.ctag-Bp0F6bXp_vgQsqeKz8bGSQ
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/aspen_grove?prefetch=8 - amq.ctag-hO6R0BACu1VPqFLmCLIDIQ
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/xactnet_latch?prefetch=8 - amq.ctag-RxhO086lSEI4yrle1zrg0A
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/shopping_cart?prefetch=8 - amq.ctag-yCBSOvV1JvtVNf3k6rpeyQ
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/xactnet_inbox?prefetch=8 - amq.ctag-qgfcznTbm_3VL4KmHPrjYw
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/xactnet_registration?prefetch=8 - amq.ctag-l86lJPD43KKAFwVbtCLdBg
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/form_validation?prefetch=8 - amq.ctag-PozJ75cp6PYVzDQpVcNRBg
2016-08-11 10:41:15.2897 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/xactnet_upload?prefetch=8 - amq.ctag-0jOImTzMhy-ecHi0zBbd4A
2016-08-11 10:41:15.3073 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/bus-CROBISON-WINX1-xactprmservices-dnaoyybjkoyy346rbdjhrb1xdd?durable=false&autodelete=true&prefetch=32 - amq.ctag-NLOZtfzm9DxSuYHKul53jA
2016-08-11 10:41:15.3247 | DEBUG | ConsumerOk: rabbitmq://rmqcluster:5672/dev/push_notifications?prefetch=8 - amq.ctag-1u48UXmvbkENJ6ruWKGpQg

What's the best way to set a consumer prefetch?

Chris Robison

unread,
Aug 11, 2016, 2:46:46 PM8/11/16
to masstransit-discuss
I found a way.

cfg.ReceiveEndpoint(MessagingConstants.XactNetUploadQueue,
                        endPntCfg =>
                        {
                            ((IRabbitMqReceiveEndpointConfigurator) endPntCfg).PrefetchCount = 2;
                            endPntCfg.UseConcurrencyLimit(2);
                            endPntCfg.Consumer<IConsumer<IUploadAssignment>>(context);
                        });

Chris Robison

unread,
Aug 11, 2016, 2:50:05 PM8/11/16
to masstransit-discuss
Looks like there are also some RabbitMQ specific receive endpoint methods that give me the correct type already.

Chris Patterson

unread,
Aug 11, 2016, 5:09:15 PM8/11/16
to masstrans...@googlegroups.com
Sounds like you found what you needed. Using the (host, queue, x) method signature, you have your PrefetchCount as a property that can be set for the RabbitMQ consumer.

ConcurrencyLimit is sort of superfluous if you're using RMQ w/PreFetch.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/80e6b7c7-87a3-4eb6-8f5b-5b15338289b4%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Денис Евдокимов

unread,
Dec 18, 2017, 2:33:39 PM12/18/17
to masstransit-discuss
Hello Chris,

Could you advice why setting ConcurrencyLimit is superfluous?

Consider my usecase:
I have queue shared between several worker hosts.
Work items are messages.
I want them proceed messages e.g. only 4 in parallel, but I don't want to any exceed messages wait as prefetched.

For example if I have 2 workers and send 12 messages. PrefetchCount is 10 and ConcurrencyLimit is 4, then that happened:
Worker 1 prefetch 10 messages, and begin consume in parallel 4 of them while 6 just waiting
Worker 2 prefetch 2 messages and begin consuming 2 messages.

So if don't want messages wait and not being proceeded - should I set PrefetchCount  always equals to ConcurrencyLimit ?


пятница, 12 августа 2016 г., 0:09:15 UTC+3 пользователь Chris Patterson написал:
To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Dec 18, 2017, 3:15:45 PM12/18/17
to masstrans...@googlegroups.com
If you're setting a PrefetchCount of 10 or 4 or 6, just leave out the concurrency limit. No need to double the efforts.


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Денис Евдокимов

unread,
Dec 18, 2017, 5:25:49 PM12/18/17
to masstrans...@googlegroups.com

Thank you for quick answer.

Can I manage level of concurrency (prefetch count) without recreating and reconfiguring IBusControl ?

I was searching for this, no luck, only find some commits about this without any usage examples.

18 дек. 2017 г., в 23:15, Chris Patterson <ch...@phatboyg.com> написал(а):

You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/VLlegDis6Ac/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.
Message has been deleted

Chris Patterson

unread,
Dec 20, 2017, 2:12:50 PM12/20/17
to masstrans...@googlegroups.com
yep, that's how you do it.

On Wed, Dec 20, 2017 at 10:24 AM, Денис Евдокимов <denis.a....@gmail.com> wrote:
Ok, I just left it here for another man like me who want to do that by some reasons.

To change the PrefetchCount on the fly you need (look in those source code: https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/SetPrefetchCount_Specs.cs) :
At the side where endoint is creating:
 - create management endpoint
 - connect it to endpoint you want to control

From any other client send SetPrefetchCount request and wait for PrefetchCountUpdated responce

That is it. Looks easy and convenient.

To post to this group, send email to masstransit-discuss@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/VLlegDis6Ac/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

To post to this group, send email to masstransit-discuss@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.

Денис Евдокимов

unread,
Jan 24, 2018, 8:17:27 AM1/24/18
to masstransit-discuss
Sorry for getting back to this again.
I have other question about setting up PrefetchCount in runtime.

I have a competing mode, where several services attached to single queue.
If I send SetPrefetchCount request to update prefetch count - that updates it for every consumer.

For example:
I have 2 consumer attached to "InputQueue", they are having PrefetchCount = 1 both, so they can proceed 2 messages in parallel from that queue.
If I send
SetPrefetchCount = 2 for the "InputQueue", they are both update configurations and begin to consume 4 messages in parallel.

As I see in details of Channels in RabbitMQ UI there is 'Global prefetch count' = 1 and 'Prefetch count' = 0 for each consumer for initial state.
When I am updating PrefetchCount  then 'Global prefetch count' updates to 2, but 'Prefetch count' still equals 0.

Are there a way, how to update PrefetchCount for only one consumer service?
Thank you.

I think that I cannot understand something important of how it is working under-hood, please forgive me for maybe stupid questions.

Денис Евдокимов

unread,
Jan 24, 2018, 8:19:35 AM1/24/18
to masstransit-discuss
Reply all
Reply to author
Forward
0 new messages