Approach for creating saga on concurrent messages of the same type

369 views
Skip to first unread message

M___

unread,
May 4, 2016, 8:35:28 AM5/4/16
to masstransit-discuss

Hello,


I am currently implementing really simple saga being able to group messages of one type and actually trigger some events depending on their amount, for instance publish the message on first message of type correlating on some property. My current simplified implementation looks like this:


'''cs 

public class FileUploadSaga : MassTransitStateMachine<FileUploadSagaData>

    {

        public FileUploadSaga()

        {

        }


        public FileUploadSaga()

        {

            Configure();

        }


        private void Configure()

        {

            InstanceState(x => x.CurrentState);


            Event(() => FileSavedEvent,

                x => x.CorrelateBy(data => data.FilePath, ctx => ctx.Message.FilePath)

                      .SelectId(ctx => Guid.NewGuid()));


            Initially(

                When(FileSavedEvent)

                    .Then(ctx =>

                    {

                        ctx.Instance.FilePath = ctx.Data.FilePath;

                        ctx.Instance.Updated = DateTime.UtcNow;

                    })

                    .Publish(...)

                    .TransitionTo(SavedOnce));


            During(SavedOnce,

                When(FileSavedEvent)


                    .Then(ctx =>

                    {

                        ctx.Instance.Updated = DateTime.UtcNow;

                    })

                    .Finalize());


            SetCompletedWhenFinalized();

        }

        public State SavedOnce { get; private set; }

        public Event<IFileSavedEvent> FileSavedEvent { get; set; }

    }

'''


The problem is that because of concurrency the Initially() handlers will be called twice if messages will come at the same time (and it is not very rare case in my application because I have huge amount of workers each of them reporting that their work is done, file saved to their place). After the handlers being executed the exception will occur on database level (insert). I was really wondering what is the recommended way to implement such functionality? 


Yes, the message will be retried eventually, but the duplicate message from Publish delegate will already be sent at that point. Sending some in-memory event to already created saga seem a big unreliable as well as if I understand correctly message will be marked as ACKed but the event will not finish execution at that point, too?


I am sorry if this is a duplicate already, but I spent quite a while trying to find an answer to the question and wasn't able to. There was a discussion about Initialize method a while ago but there was no further information on this available.


Thanks!

Chris Patterson

unread,
May 12, 2016, 9:21:30 AM5/12/16
to masstrans...@googlegroups.com
First, you should make sure that your database isolation level is set to Serializable. This should help reduce some of the contention, but because you are operating by matching the file path (and not the primary key) then you may see two messages being processed at the same time.

You could also use the Partitioner, so that on a single node if messages that collide on hashes are received, they'll be serialized inside the bus itself -- so that only a single consumer operations on it at a time.


This code shows how to configure it, and a single partitioner may be shared by multiple UsePartitioner middleware components so that different messages types can be partitioned using a single partitioner (for cases where different messages types may collide into a single instance).


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/b6ecc0c5-9d5b-41d0-bd1a-ad6bf9c653a5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris Patterson

unread,
May 12, 2016, 9:22:38 AM5/12/16
to masstrans...@googlegroups.com
Also, if you're producing events, you could .UseInMemoryOutbox() to defer the publishing of events until the saga completes successfully (in the case of a duplicate key, the events will be discarded and the message will retry -- assuming you're using the .UseRetry() middleware to retry).


On Wed, May 4, 2016 at 8:35 AM, M___ <muf...@gmail.com> wrote:

--
Reply all
Reply to author
Forward
0 new messages