MassTransit - Can Multiple Consumers All Receive Same Message?

87 views
Skip to first unread message

Ian Frawley

unread,
Jul 25, 2019, 4:48:15 PM7/25/19
to masstransit-discuss

I have one .NET 4.5.2 Service Publishing messages to RabbitMq via MassTransit.


And multiple instances of a .NET Core 2.1 Service Consuming those messages.


At the moment competing instances of the .NET core consumer service steal messages from the others.


i.e. The first one to consume the message takes it off the queue and the rest of the service instances don't get to consume it.


I want ALL instances to consume the same message.


How can I achieve this?


Publisher Service is configured as follows:


 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();

And the .NET Core Consumer Services are configured as follows:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();

And then MyWrapperConsumer looks like this:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}


Daniel Favano

unread,
Aug 2, 2019, 1:43:22 PM8/2/19
to masstransit-discuss
I would ask why you would want to do this? If all your workers are getting the same message and trying to do something with it you are loosing out on any scalability. I would say if you want things to be processed in parallel you would want to make specific workers per process. For example, if say you have an order and when you get an order you need to generate a pdf receipt, send an email, and then process the order I would make 3 works with there own queues so they all get the same message. To scale out you would want all the instances of each worker to compete for the message like they are now.
Reply all
Reply to author
Forward
0 new messages