Saga persistance: TransitionTo, Than and ISagaRepository<TSaga>.Send interaction

121 views
Skip to first unread message

Roman Prykhodko

unread,
Nov 30, 2016, 5:10:02 AM11/30/16
to masstransit-discuss
Hi, guys.
As I understand, It's absolutely normal behavior for saga to rollback state saving operation (ISagaRepository<TSaga>.Send) and it happens many times during it's work. For example, when event comes in not appropriate state. So generally speaking we never can't be sure, that some TransitionTo execution will be successful. But nevertheless, in classic examples we execute TransitionTo method AFTER some logic, wrapped in Then, ThenAsync, Publish and so on (see the following code as example).

During(Active,
    When(Submitted)
        .Then(context =>
        {
            if (context.Data.Timestamp > context.Instance.Updated)
                context.Instance.Updated = context.Data.Timestamp;
            context.Instance.OrderId = context.Data.OrderId;
        })
        .ThenAsync(context => Console.Out.WriteLineAsync($"Cart Submitted: {context.Data.UserName} to {context.Instance.CorrelationId}"))
        .Unschedule(CartExpired)
        .TransitionTo(Ordered),
Why such examples recommend to save saga after logic if save transaction might be rollbacked?
Next, even if we move TransitionTo BEFORE logic, it will not help, because TransitionTo and Then are executed concurrently, and TransitionTo in slower in most cases. Is there any way to get TransitionTo success notification and only after it execute some logic? 

Chris Patterson

unread,
Nov 30, 2016, 7:37:07 AM11/30/16
to masstrans...@googlegroups.com
Make sure your isolation level is serializable and this problem goes away. Two concurrent instances of the same saga won't execute.

Use the InMemoryOutbox if you want to defer publishing of events until you're certain the state updates are persisted.

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/6d9418d1-1b9a-4690-89a7-aae3f3a442d0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Roman Prykhodko

unread,
Nov 30, 2016, 7:50:04 AM11/30/16
to masstransit-discuss
Chris, I already have serializable isolation level.
As for InMemoryOutbox - I am sorry, but I need to defer not events publishing, but "Then" block executing until saga is persisted. Will InMemoryOutbox help me in this case?


On Wednesday, November 30, 2016 at 3:37:07 PM UTC+3, Chris Patterson wrote:
Make sure your isolation level is serializable and this problem goes away. Two concurrent instances of the same saga won't execute.

Use the InMemoryOutbox if you want to defer publishing of events until you're certain the state updates are persisted.

__
Chris Patterson




On Wed, Nov 30, 2016 at 4:10 AM -0600, "Roman Prykhodko" <prykhod...@gmail.com> wrote:

Hi, guys.
As I understand, It's absolutely normal behavior for saga to rollback state saving operation (ISagaRepository<TSaga>.Send) and it happens many times during it's work. For example, when event comes in not appropriate state. So generally speaking we never can't be sure, that some TransitionTo execution will be successful. But nevertheless, in classic examples we execute TransitionTo method AFTER some logic, wrapped in Then, ThenAsync, Publish and so on (see the following code as example).

During(Active,
    When(Submitted)
        .Then(context =>
        {
            if (context.Data.Timestamp > context.Instance.Updated)
                context.Instance.Updated = context.Data.Timestamp;
            context.Instance.OrderId = context.Data.OrderId;
        })
        .ThenAsync(context => Console.Out.WriteLineAsync($"Cart Submitted: {context.Data.UserName} to {context.Instance.CorrelationId}"))
        .Unschedule(CartExpired)
        .TransitionTo(Ordered),
Why such examples recommend to save saga after logic if save transaction might be rollbacked?
Next, even if we move TransitionTo BEFORE logic, it will not help, because TransitionTo and Then are executed concurrently, and TransitionTo in slower in most cases. Is there any way to get TransitionTo success notification and only after it execute some logic? 

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Roman Prykhodko

unread,
Nov 30, 2016, 10:20:20 AM11/30/16
to masstransit-discuss
Tried to set InMemoryOutbox for all recieveEndpoints including stateMachine itself - it doesn't help, still have my problem reproduced.

Chris Patterson

unread,
Nov 30, 2016, 11:47:37 AM11/30/16
to masstrans...@googlegroups.com
You may need to split the handling into two separate handlers, and produce an event from the first handler that triggers the second handler. That way your state is persisted in between the events.

Generally you should not have two TransitionTo() methods in a single event handler.


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Roman Prykhodko

unread,
Nov 30, 2016, 1:33:28 PM11/30/16
to masstransit-discuss
I probably did't make myself clear - I have not two TransitionTo() methods in a single event handler.
I have implemented splitting the handling into separate events, but it doesn't help and that is why.

Say, originally I had the following logic:

              During(ReportForSendReceived,              
                 When(ReportSuccessfullySentEvent)
                    .TransitionTo(ReportSuccessfullySent)
                    .Then(ctx => ...some method is called from here...)
                    .Publish(ctx => MapReportStateChangedEvent(ctx.Instance, ctx.Data))
                    .Publish(ctx => MapEventLoggingService(UserAction.ReportSuccessfullySent, ctx.Data)),

The problem was is that .Than and .Publish blocks were executed before ReportSuccessfullySent had been saved in DB.

After splitting I have:

            During(ReportForSendReceived,
                When(ReportSuccessfullySentEvent)
                    .TransitionTo(ReportSuccessfullySent)
                    .Publish(ctx => ReportSuccessfullySent_RunHandlerEvent));

            During(ReportSuccessfullySent,
                When(ReportSuccessfullySent_RunHandlerEvent)
                    .Then(ctx => ..some method is called from here.... )
                    .Publish(ctx => MapReportStateChangedEvent(ctx.Instance, ctx.Data))
                    .Publish(ctx => MapEventLoggingService(UserAction.ReportSuccessfullySent, ctx.Data)),

But again in a new edition ReportSuccessfullySent_RunHandlerEvent if fired before ReportSuccessfullySent state is saved in DB, and even if  ReportSuccessfullySent state will be rollbacked, we will have redundant ReportSuccessfullySent_RunHandlerEvent in queue. Than ReportSuccessfullySent will try to be saved again, and another ReportSuccessfullySent_RunHandlerEvent will be added to the queue.

Another huge problem is that second block without TransitionTo method is also fires ISagaRepository.Send method, which checks if current state is ReportSuccessfullySent and if not - it will rolllback transaction while .Than and .Publish methods are already executed concurrently.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/6d9418d1-1b9a-4690-89a7-aae3f3a442d0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Nov 30, 2016, 2:42:52 PM11/30/16
to masstrans...@googlegroups.com
So understand what's happening here, you publish a message, and another consumer immediately picks it up. At that point, the other consumer attempts to load the saga (via the call to ISagaRepository.Send). So you now how two consumers attempting to use the same saga.

If the database isolation level is correct, this will never get to the state machine until after the previous consumer completes and persists the state to the database. If your isolation level is wrong (and is not in fact serializable), you'll see the behavior that you are seeing.

Are you using NHibernate or Entity Framework? It shouldn't matter, but the locking code for EF is different, so perhaps somebody else can chime in, but this behavior should be working as expected or EVERYONE would have the issue you're seeing.


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Roman Prykhodko

unread,
Nov 30, 2016, 3:10:25 PM11/30/16
to masstransit-discuss
My isolation level is Serializable, here is repository constructor:

        public EdoSagaRepository(IsolationLevel isolationLevel = IsolationLevel.Serializable)
        {
            _isolationLevel = isolationLevel;
            _sagaDbContextFactory = () => GlobalContainerInitializer.Container.Instance.Resolve<DbContext>();
        }

I use Entity Framework.
Why do You think I have two consumers? No, my only consumer in example above is StateMachine. It recieves message, and than begins to concurrently call ISagaRepository.Send and execute handlers (.Than, .Publish). After that ISagaRepository.Send is rollbacked while handlers are successfully executed.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Nov 30, 2016, 8:58:59 PM11/30/16
to masstrans...@googlegroups.com
A single type of consumer. The state machine.

Two instances of it running concurrently. 

Add .UseConcurrencyLimit(1) and see if your issue disappears. 

__
Chris Patterson




To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Roman Prykhodko

unread,
Nov 30, 2016, 11:55:20 PM11/30/16
to masstransit-discuss
No, single consumer (the state machine), ONE instance.

During(ReportForSendReceived, 
           When(ReportSuccessfullySentEvent)
                    .TransitionTo(ReportSuccessfullySent)
                    .Then(ctx => ...some method is called from here...)
                    .Publish(ctx => MapReportStateChangedEvent(ctx.Instance, ctx.Data))
                    .Publish(ctx => MapEventLoggingService(UserAction.ReportSuccessfullySent, ctx.Data)),

1.Current state is NOT ReportForSendReceived
2.ReportSuccessfullySentEvent comes into state machine
3.The statemachine instance (single) concurrently starts several processes:
- calling ISagaRepository.Send
-"Then" block processing
-"Publish" blocks processing
4.ISagaRepository.Send throws exception because current state is not ReportForSendReceived and message returns into queue (due to retry mechanism)
5.Seems like we appeared in point 1 again BUT "Then" and "Publish" blocks has been fired (in point 3) and were not cancelled.

Is it normal statemachine behavior? If not, what point goes wrong?

Thank You for Your patience ;)
Reply all
Reply to author
Forward
0 new messages