How to properly handle unexpected asynchronous events in different saga states?

123 views
Skip to first unread message

Alex Wheat

unread,
Feb 13, 2018, 12:18:48 PM2/13/18
to masstransit-discuss
Hi guys,

we need some "best practices" suggestions how to deal with saga in our specific case. He is our situation: we have some "Service" that we don't have control on and that issues some events like "ProcessingStarted", "ProgressUpdated", "ProcessingFinished". Now, in the saga, we want to do next:

1. Catch "ProcessingStarted" event and initiate saga
2. Issue command to register some aggregate and wait while it was created
3. Catch "ProgressUpdated" events and change progress in aggregate
4. Wait while "all" issued "ProgressUpdated" events successfully consumed by the saga
5. Catch "ProcessingFinished" and finish the saga

In the saga we are planning to have default "Init" state what should catch only "ProcessingStarted" event and initiate the saga. After that we are moving to "Creating" state where we issue the command to create aggregate and wait while we get event about successful creation. As soon we got it we are moving to "Processing" state and start consume "ProgressUpdated" and "ProcessingFinished" events. As soon as we done with all events we finalize the saga.

Now the questions that we have:

1. How to deal with the situations when any "ProgressUpdated" or "ProcessingFinished" delivered to the saga before "ProcessingStarted"? One of the options is try to redeliver that events later and wait while "ProcessingStarted" consumed and saga initiated. Is it good or bad?
2. Do we need to catch not expected events in all saga states or there is some generic way to send unexpected events for later redelivery?
3. How to make sure that we successfully consumed all "ProgressUpdated" events before to finalize the saga?

Thanks in advance for any suggestions and ideas!

Regards,
Alex



Alex Wheat

unread,
Feb 15, 2018, 2:16:32 PM2/15/18
to masstransit-discuss
Now we mostly have the problems with events that arrive before ProcessingStarted event that triggers the saga and in this case those events (that appeared before ProcessingStarted) just lost and not delivered later. So, in order to prevent this behavior we do something like this:

            Event(() => ProcessingStarted, x => x.CorrelateById(context => context.Message.Id));
            Event(() => ProgressUpdated, x => x.CorrelateById(context => context.Message.Id));
            Event(() => ProcessingFinished, x => x.CorrelateById(context => context.Message.Id));

            Initially(
                When(ProcessingStarted)
                    .TransitionTo(Creating)
                    .ThenAsync(async context =>
                    {
                        // initialize 
                    }),
                When(ProgressUpdated)
                    .Then(context =>
                    {
                        throw new UnhandledEventException();
                    }),
                When(ProcessingFinished)
                    .Then(context =>
                    {
                        throw new UnhandledEventException();
                    })
            );

and configure retry policy like this

                cfg.UseRetry(r =>
                {
                    r.Incremental(100, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(50));
                    r.Handle<Automatonymous.UnhandledEventException>();
                });

It kinda works and makes that events redelivered when Saga created but I still have a feeling that something wrong with this solution... 
Reply all
Reply to author
Forward
0 new messages