The best way to suppress MongoDbConcurrencyException in Saga

853 views
Skip to first unread message

Alexei K

unread,
Aug 18, 2017, 12:05:11 PM8/18/17
to masstransit-discuss
Hello,

I have the following StateMachine definition step:

During(Active,
 
When(RecordPersisted)
 
.Then(context => {
   
Log.Debug("RecordPersisted");
 
})
 
.Catch<MongoDbConcurrencyException>(ex => ex
     
.Then(c => {
       
Log.Warn("MongoDbConcurrencyException occurred");  //this is never shown
     
}))
 
.TransitionTo(Persisted) //exception happens on persisting this state
)


When Saga state is persisted, there is MongoDbConcurrencyException and it's ok, but I just would like to somehow handle / suppress it, otherwise an error message is delivered into _error queue.

I tried to use Catch, but that seems to take care only of Then statements, while exceptions on TransitionTo are not covered.

As another alternative, I tried to setup partitioner, but failed, as I'm using Autofac helpers and I haven't found a way to UsePartitioner in such code:

x.ReceiveEndpoint(host, queue, ep => {
 ep
.UseInMemoryOutbox();
 ep
.LoadFrom(context);
 ep
.LoadStateMachineSagas(context);

// how to UsePartitioner here?

 
});
except for rewriting all the same functionality like in that helper with some custom extensions, but probably there is a better way?

Could you please advise on what is the best approach here?

Thanks a lot!
Alex

Alexey Zimarev

unread,
Aug 18, 2017, 1:40:43 PM8/18/17
to masstransit-discuss
The best approach to overcome concurrency exceptions for message-intensive sagas is to use retry policy, with Retry.Immediate.

Alexei K

unread,
Aug 19, 2017, 5:04:37 AM8/19/17
to masstransit-discuss
Thank you for the suggestion, Alexey,

I've tried to add that to configuration, but, unfortunately, that doesn't work as I expected.

ep.UseRetry(p => {
    p
.Immediate(0);
    p
.Handle<MongoDbConcurrencyException>();
 
});

I tried to put that before and after ep.LoadStateMachineSagas(context); with several runs it always results in error.
Still, sometimes I see R-RETRY in logs, thus I conclude those exceptions are occasionally handled by retry policy and are not retried, but not all of them.

One more note, initially I tried to setup p.None() instead of p.Immediate(0), but that didn't work at all.

So, if we cannot handle those exceptions, probably it's better to prevent them at all via partitioner? The only question is how to configure it properly.

Thanks!

Alexey Zimarev

unread,
Aug 19, 2017, 6:39:00 AM8/19/17
to masstrans...@googlegroups.com
Yeah, but Immediate(0) means you want zero retries. I guess this is not what you want. May be Immediate(10) would work better?

--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/x9Wt3KUoRZE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/e9705406-04af-4316-8868-bc621e261e7c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Alexei K

unread,
Aug 19, 2017, 6:42:28 AM8/19/17
to masstransit-discuss
Alexey,

The problem is that I do not need to retry at all, I just want to silently skip those exceptions to prevent generating Fault<T> events and flooding _error queue with unnecessary mails. 
Of course I can try handling Fault<T> events too, but that will result in additional load on MT and Rabbit - that's smth I'd like to avoid.

Thanks!

Alexey Zimarev

unread,
Aug 19, 2017, 8:58:06 AM8/19/17
to masstrans...@googlegroups.com
I see but I am not sure what are you trying to do. Concurrency exceptions are usually easily overcome by retrying since it will get the record again. But if this is how your logic works…

You can create a custom middleware, which will swallow these exceptions. Custom middleware sample is a good start.

--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/x9Wt3KUoRZE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Aug 19, 2017, 10:11:30 AM8/19/17
to masstrans...@googlegroups.com
You can't catch saga persistence exceptions in the state machine, you have to do it with a retry policy on the receive endpoint.

Also, the retry policy should be placed prior to the in-memory outbox. Order matters with middleware.

__
Chris Patterson

From: masstrans...@googlegroups.com <masstrans...@googlegroups.com> on behalf of Alexei K <lek...@gmail.com>
Sent: Friday, August 18, 2017 11:05:10 AM
To: masstransit-discuss
Subject: [masstransit-discuss] The best way to suppress MongoDbConcurrencyException in Saga
 
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Alexei K

unread,
Aug 21, 2017, 3:57:08 AM8/21/17
to masstransit-discuss
Hello Chris,

Thank's a lot for your reply!

Placing UseRetry before UseInMemoryOutbox helped, the code below seems to properly catch all exceptions:
ep.UseRetry(p => {
 p
.Immediate(0);
 p
.Handle<MongoDbConcurrencyException>();
});

ep
.UseInMemoryOutbox();
ep
.LoadFrom(context);
ep
.LoadStateMachineSagas(context);


On another question re partitioner. I found the following gist in other threads: https://gist.github.com/phatboyg/9775503fe4441630e6b0e488baa1ea11

If I need to specify a partitioner for my saga and let it load configuration from Autofac, I guess instead of LoadStateMachineSagas helper I need to use smth like:

var partitioner = ep.CreatePartitioner(1);
var scope = context.Resolve<ILifetimeScope>();
var stateMachine = context.Resolve<TicketsStateMachine>();
ep
.StateMachineSaga(stateMachine, scope, sc => {
 sc
.Message<TicketUpdated>(m => m.UsePartitioner(partitioner, kp => kp.Message.TicketId));
});

Thank you!

Chris Patterson

unread,
Aug 21, 2017, 9:23:55 AM8/21/17
to masstrans...@googlegroups.com
sounds like you found the answers you seek, both for the retry and the partitioner. Glad it helped!


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/e065efbc-b79b-46d7-af74-abf4ce118324%40googlegroups.com.

Alexei K

unread,
Aug 21, 2017, 9:40:16 AM8/21/17
to masstransit-discuss
Yes, I did,

Thank you again for the support and great product!

Alexei K

unread,
Aug 23, 2017, 6:35:23 AM8/23/17
to masstransit-discuss
Hello Chris,

Unfortunately after more detailed testing I'm still experiencing the issues with UseRetry and partitioner.
I prepared gist to show exact small test that I'm running. https://goo.gl/Sur5Br
I might be missing something very important, because it doesn't work as expected.

1) Issue with partitioner
If I run the code, it will create 3 saga instances (I see 3 log messages from line 51:
--- SAGA: created: 283b5f4d-4178-4156-aeaa-2effda6cc0ef
--- SAGA: created: 283b5f4d-4178-4156-aeaa-2effda6cc0ef
--- SAGA: created: 283b5f4d-4178-4156-aeaa-2effda6cc0ef

while I expect to see
--- SAGA: created: 283b5f4d-4178-4156-aeaa-2effda6cc0ef
--- SAGA: already active (Active) for record: 283b5f4d-4178-4156-aeaa-2effda6cc0ef
--- SAGA: already active (Active) for record: 283b5f4d-4178-4156-aeaa-2effda6cc0ef
because I set to use partitioner = CreatePartitioner(1) or does that work some other way?

I just need to ensure only 1 saga instance is created for each record, but with current setup it creates multiple instances (both with inmemory and mongo storages).
Initially I thought the issue is with Correlation, but I tried both CorrelateBy and CorrelateById, that doesn't influence on behaviour.

2) the issue with UseRetry not catching all exceptions
If I switch to using MongoDbSagaRepository in line 106 I will get a lot of MongoDbConcurrencyException exceptions.
I also see:
R-RETRY rabbitmq://localhost/test_queue N/A Unable to update saga. It may not have been found or may have been updated by another process
so, some messages are catched by UseRetry policy, still if I increase number of messages sent, I also see:
R-FAULT rabbitmq://localhost/test_queue N/A TestMT.RecordCreated TestMT.Program+RecordState(00:00:00.0771963) Unable to update saga. It may not have been found or may have been updated by another proces
and so the error message goes to _error queue, that is something I would like to avoid.
What I expect is to have only R-RETRY logs and no further actions.

May I ask you to glance on the code and advise on what might be an error in my configuration or some other issue?

Thank you!

 

Chris Patterson

unread,
Aug 23, 2017, 4:11:33 PM8/23/17
to masstrans...@googlegroups.com
You need to put all three message types here:


Not just the single message type.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.

Alexei K

unread,
Aug 23, 2017, 4:46:59 PM8/23/17
to masstransit-discuss
Hi Chris,

Thank you for your reply.
In fact that was the first option I tried before sending my question to you, but as long as issue happens with only 1 message type and on initial creation, I removed those lines to have less code.
In any case, I've just double-checked, even with all message types specified there the behaviour is all the same - I see 3 separate saga instances created for unknown reason.
Please advise.

Thank you!

Nick Archibald

unread,
Aug 23, 2017, 8:26:43 PM8/23/17
to masstransit-discuss
Hi Alexei

I'm not sure but it sounds like the problem I had with the order of the partitioner in the execution pipeline with a Saga. Chris helped me with a solution to work around the problem which you can check out a sample project here https://github.com/nicka-cme/simple-MassTSaga. This sample project is not using MongoDB but it just seems like the same problem. I believe Chris has made a change to fix the issue but I'm not sure whether it is release or not.

Nick

Alexei K

unread,
Aug 24, 2017, 7:04:07 AM8/24/17
to masstransit-discuss
Hello Nick,

Thank you for providing that additional information.
Yes, seems like my issue is very similar to one you had. I've seen your reply before, but thought that patch is already in master, while it's still in develop branch.
I also tried to compare your code with mine to find any inconsistencies that could help in troubleshooting.

Below are my findings:
1) In my initial report, even with InMemoryStorage I always got 3 new Saga instance created. I traced that to using CorrelateBy statement.
I used the following code (based on a sample in docs):
Event(() => RecordCreated, x => x.CorrelateBy(s=>s.RecordId, context => context.Message.RecordId).SelectId(s => Guid.Parse(s.Message.RecordId)));

and that didn't work.
After I changed that to:
Event(() => RecordCreated, x => x.CorrelateById(context => Guid.Parse(context.Message.RecordId)).SelectId(s => Guid.Parse(s.Message.RecordId)));

InMemory storage started to work properly. E.g. a new record was created only for the first time, and then I got a message that such saga is already created.
So, I conclude we should always compare by CorrelationID even for creation?

Based on a sample from documentation I thought that CorrelateBy compares values of saga instance with one provided in a message context. But seems like it has some different logic?
also, that code worked with strings, but trying to use int or Guid fields in those expressions will even not compile (expects reference type).

2) in regards to partitioner
After I used the latest version from develop branch, I was able to setup partitioner with the code:
var partitioner = ep.CreatePartitioner(1);
ep
.StateMachineSaga(stateMachine, sagaRepository, sc => {
   sc
.Message<RecordCreated>(m => m.UsePartitioner(partitioner, k => k.Message.RecordId));
});

when using master, I think the code from your sample worked ok too, e.g.:
ep.UsePartitioner(1,
       cc
=> {
           
if (cc.TryGetMessage(out ConsumeContext<RecordCreated> createSagaContext))
                 
return Guid.Parse(createSagaContext.Message.RecordId);
           
return cc.CorrelationId ?? Guid.Empty;
});

3) while I was able to setup partitioner, I'm not sure it works ok.
My understanding of how partitioner works is the following:
 - providing we have 10 messages, some with ID="A" and some with ID="B"
 - providing we have partitionCount set to 2
 - I expect that there will be at most  2 consumers working on message A and 2 consumers working on message B at the same time?
Is that correct assumption?

From my observation, setting any parameter for partitionCount results in no effect. Seems like messages are always processed one by one.
E.g. UsePartitioner(10) works the same as UsePartitioner(1)
I test that by setting PrefetchCount to 1000 and setting UsePartitioner to 500. With such setup, if we allow max 500 consumers to work on same-type messages we should get concurrency errors, while they never appear.
While if I do not use partitioner, pumping even 10 messages will always result in concurrency issue.

4) Finally, my original issue with exceptions still doesn't work in develop too.
E.g. I still see exceptions in failed messages moved to error queue in develop branch.
Strange thing I noticed is that there is always exactly 1 error message, regardless of a number of messages I send to test. I guess that happens due to InMemoryOutbox that aggregates all those exceptions into one?

Please advise.

Thank you!

Nick Archibald

unread,
Aug 25, 2017, 6:53:18 PM8/25/17
to masstransit-discuss
So the partitioner count number makes sure that only n unique correlation IDs are being consumed at once. So in your example if the partition count was 5 and you only had A's and B's you will only have 1A and 1B being consumed at one time.

I have noticed in your gist that your guid is a string and you parse it to a guid. It kind of looks like you missed a parse but I'm not sure. Maybe you could simplify it and just pass a guid so you don't have any parsing.

Alexei K

unread,
Aug 27, 2017, 9:52:49 AM8/27/17
to masstransit-discuss
Hello Nick,

Thank you for clarifying of how partitioner works.

Re issue with CorrelateBy and CorrelateById, I guess the reason I observed those inconsistencies was related to implementation specifics of InMemorySagaRepository, and as soon as CorrelateById is faster than CorrelateBy it produced different behaviour with concurrent incoming messages.

In regards to original issue with exceptions. As I see, the error exception cannot be fully skipped with UseRetry( Immediate(0) ), as it was designed to work with values >0. At the same time, as soon as I know now how to add partioner in 3.5.7 version, it's much better solution, as it will prevent those concurrency issues completely.

So, at this point of time, seems like everything is considered. 
Thanks again!
Reply all
Reply to author
Forward
0 new messages