How to requeue a batch of messages when an error is encountered during consumption

1,105 views
Skip to first unread message

Martin C

unread,
Feb 23, 2021, 5:33:52 AM2/23/21
to masstransit-discuss
Hi

I'm using MassTransit as part of a system that replicates SQL statements across multiple environments. In our setup, each message contains a SQL statement to be executed, and each message must be processed in the order that they were published to ensure data integrity (i.e. message 1 may contain an insert statement, message 2 may contain an update statement relating to the record inserted in message 1 - we cannot process message 2 before message 1). If the SQL in a message fails to execute, we throw it back to the front of the queue so we can attempt to re-process it again straight away (I am aware this may lead to "stuck" queues if a message repeatedly fails, but that's being handled internally).

At the moment we are consuming each message individually, which works well except it can lead to a large backlog of messages as we process them one at a time. To resolve this, I have changed my consumer to work as a batch consumer. If a messages SQL fails to execute, we rollback the transaction and throw an exception with the intention being that the entire batch of messages is thrown back on the queue to be re-processed again (in the same order). Unfortunately it seems that on the second round of consuming a batch after a failure happens, the messages contained within the batch are slightly different. The extent to which the messages are different seems to vary, sometimes it's the same set of messages but in a different order, and sometimes the batch contains an almost entirely different set of messages.

Below I have included examples of my configuration:

private IBusControl ConfigureReceivingBus(IServiceConfiguration configuration, IUnityContainer container)
{
    return Bus.Factory.CreateUsingRabbitMq(mqConfigurator =>
    {
        ConfigureBus(configuration.QueueConfig.OperationalData, mqConfigurator);

        mqConfigurator.ReceiveEndpoint($"{DATA_EXCHANGE}, endpointConfigurator =>
        {
            // Messages should persist even if RabbitMQ goes down.
            endpointConfigurator.Durable = true;
            endpointConfigurator.AutoDelete = false;

            // If we error whilst reading a message, we should put it back on the queue.
            endpointConfigurator.RethrowFaultedMessages();

            endpointConfigurator.ConfigureConsumeTopology = false;

            endpointConfigurator.PrefetchCount = 10;

            endpointConfigurator.Bind(DATA_EXCHANGE, exchangeBindingConfigurator =>
            {
                exchangeBindingConfigurator.Durable = true;
                exchangeBindingConfigurator.AutoDelete = false;
                exchangeBindingConfigurator.RoutingKey = configuration.computer.Id.ToString();
                exchangeBindingConfigurator.ExchangeType = ExchangeType.Direct;
            });

            endpointConfigurator.Batch<ChangeData>(batchConfigurator =>
            {
                // Up to 10 messages per batch.
                batchConfigurator.MessageLimit = 10;

                // Wait up to 5 seconds before consuming a partial batch.
                batchConfigurator.TimeLimit = TimeSpan.FromSeconds(5);

                // Only process 1 batch at a time.
                batchConfigurator.ConcurrencyLimit = 1;

                endpointConfigurator.Consumer<IConsumer<Batch<ChangeData>>>(() => container.Resolve<DataReceiver.DataReceiver<ChangeData>>());
            });
        });
    });
}


public async Task Consume(ConsumeContext<Batch<TMessage>> context)
{
    using DbFactory dbFactory = _dbFactoryProvider();

    dbFactory.BeginTransaction();

    Batch<TMessage> batch = context.Message;

    try
    {

        foreach (ConsumeContext<ChangeData> tableChangeContext in batch)
        {
            (string sql, IDictionary<string, object> parameters) = SqlConverter.Convert(tableChangeContext.Message);

            await dbFactory.ExecuteSql(sql, parameters).ConfigureAwait(false);
        }

        // debug -> testing SQL execution failure
        throw new InvalidOperationException("testing");

        dbFactory.CommitTransaction();
    }
    catch (Exception ex)
    {
        dbFactory.RollBackTransaction();
        throw;
    }
}

Is there a way to ensure that when consuming a batch of messages fails, we put the messages back on the queue in order and re-process the same set of messages in the next batch? Or have I misunderstood how batching works?

Chris Patterson

unread,
Feb 23, 2021, 11:38:38 AM2/23/21
to masstrans...@googlegroups.com
Could you repost this as a GitHub discussion? https://github.com/MassTransit/MassTransit/discussions


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/21ed4cf9-e23d-4ec4-8a42-15020a187f81n%40googlegroups.com.

Martin C

unread,
Feb 23, 2021, 12:11:31 PM2/23/21
to masstransit-discuss
Reply all
Reply to author
Forward
0 new messages