MongoDbSagaRepository MongoDbConcurrencyException on simultaneous incoming events

28 views
Skip to first unread message

jonathan.va...@mediamonks.com

unread,
Nov 8, 2018, 6:34:21 AM11/8/18
to masstransit-discuss
Hello!

I'm using MassTransit Saga state machines with the MongoDbSagaRepository. I have multiple incoming events (PurchaseAddressesFinished and ProductSnapshotsFinished) that arrive simultaneously, see the code below. Note that this is just a fragment of the complete state machine for brevity.  Both events result in a property of the Instance being updated (see second During). This simultaneous update results in a race condition where a MongoDbConcurrencyException is thrown, because the instance with the previous Version is not found. I've looked at the MongoDbSagaRepository and it seems no exclusive lock (single read, single write) is applied in the Send and SendQuery methods. I would expect such a lock to be in place to prevent this race condition. Is there a reason this scenario wasn't covered, or am I missing something?

I can implement my own ISagaRepository with an exclusive lock on the instance level, but I'd prefer to have a solution in MassTransit.Saga itself.

I've enabled UseInMemoryOutbox as recommended, but of course this is only for outgoing messages, not for incoming messages.

public sealed class PaymentStateMachine : MassTransitStateMachine<PaymentInstance>
{
    public PaymentStateMachine(ILogger logger)
    {
        // ... Initialization omitted for brevity
        
        During(ImportingFirstStage,
            When(FirstStageFinished)
                .TransitionTo(ImportingSecondStage)
                // Each IImportResourcesInitiator a Command to MassTransit to do some work in another process.
                // When that process finishes, an IResourcesImported event is fired.
                .Activity(s => s.OfType<IImportResourcesInitiator<PaymentInstance, IImportAddress, PaymentContext>>())
                .Activity(s => s.OfType<IImportResourcesInitiator<PaymentInstance, IImportProductSnapshot, PaymentContext>>())
        );

        During(ImportingSecondStage,
            When(PurchaseAddressesFinished)
                // Store some data in the Instance, this results in the MongoDbConcurrencyException.
                .Then(c => c.Instance.PurchaseAddressesImportSuccessful = c.Data.Succeeded)
                .Activity(s => s.OfType<IPostProcessResourcesInitiator<PaymentInstance, IImportAddress>>()),
            When(ProductSnapshotsFinished)
                // Store some data in the Instance, this results in the MongoDbConcurrencyException.
                .Then(c => c.Instance.ProductSnapshotsImportSuccessful = c.Data.Succeeded)
                .Activity(s => s.OfType<IPostProcessResourcesInitiator<PaymentInstance, IImportProductSnapshot>>())
        );
    }
    
    public State ImportingFirstStage { get; private set; }

    public State ImportingSecondStage { get; private set; }

    public Event<IResourcesImported<IImportAddress>> PurchaseAddressesFinished { get; private set; }

    public Event<IResourcesImported<IImportProductSnapshot>> ProductSnapshotsFinished { get; private set; }
}

Thanks for the help!

Cheers,

Jonathan
Reply all
Reply to author
Forward
0 new messages