Saga race condition when persisting to database

275 views
Skip to first unread message

Ben Poelstra

unread,
Jan 12, 2016, 6:47:08 PM1/12/16
to masstransit-discuss, Shannon Reimer
I am having a problem with a Saga that is being persisted to database using EntityFramework. It seems like there is a race condition when a response event is happening before the saga instance is persisted to the database. Here is the basic idea of what we are trying to do:

    public class FakeSaga : MassTransitStateMachine<FakeSagaInstance>
   
{
       
public State FirstState { get; set; }
       
public State SecondState { get; set; }


       
public Event<EventOneMessage> EventOne;
       
public Event<EventTwoResponseMessage> EventTwoResponse;


       
public FakeSaga()
       
{
           
InstanceState(x => x.CurrentState);


           
Initially(
               
When(EventOne)
                   
.TransitionTo(SecondState)
                   
.Publish(new EventTwoMessage())
               
);
           
During(SecondState,
               
When(EventTwoResponse)
                   
.Finalize());

           
SetCompletedWhenFinalized();
       
}
   
}


   
public class FakeSagaInstance : SagaStateMachineInstance
   
{
       
public Guid CorrelationId { get; set; }
       
public string CurrentState { get; set; }
   
}


   
public class EventOneMessage : CorrelatedBy<Guid>
   
{
       
public Guid CorrelationId { get; set;  }
   
}


   
public class EventTwoMessage : CorrelatedBy<Guid>
   
{
       
public Guid CorrelationId { get; set;  }
   
}


   
public class EventTwoResponseMessage : CorrelatedBy<Guid>
   
{
       
public Guid CorrelationId { get; set; }
   
}


The problem is we are getting an error saying that we are getting the EventTwoResponse while in the initial state. I guessing that the save changes in EntityFramework is taking longer than our consumer for processing EventTwoMessage. Has anyone seen anything like this or found a workaround for this?

Andrew Duff

unread,
Jan 14, 2016, 7:02:42 AM1/14/16
to masstransit-discuss, Shannon Reimer
We had this same issue in MT2 and moving into MT3. The way we got around it was to create a new extension on EventActivityBinder we called it PublishAfter, what this method will do is to add a entry into a ConcurrentDictionary that uses the CorrelationId as the key and a list of objects that are messages to publish.

Create a wrapper around the SagaRepository implementing the ISagaRepository interface. Marry all the methods up to the proxied instance. The only method we're its not just a direct call to the proxied method is Send. On send call the proxied instance then add code to release all the messages and actually publish to the bus from the ConcurrentDictionary for that saga, you can use context.CorrelationId.Value in the Send method to correlate the correct release.

Ill do a post if I can on how on the weekend if I get time, we verified this with a test that published 100,000 messages all the consumers did was publish the next message immediately. We had zero instances where the message was out of order where as before it was about 20%.


Thanks

Andy

Chris Patterson

unread,
Jan 14, 2016, 10:00:10 AM1/14/16
to masstrans...@googlegroups.com
Did you see my pre insert response? That's the key. 

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/8d7612ef-b756-4b65-981f-e5dd8eea963e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ben Poelstra

unread,
Jan 14, 2016, 10:59:34 AM1/14/16
to masstransit-discuss
I did try that. It is still doing the same thing. Its not that it is creating a new saga instance, the existing saga instance isn't updating, I think the problem is that its not persisting the state change fast enough. If I put a wait in my consumer for a second it doesn't throw the error anymore. That's why I think its a race condition. I've also tried querying the saga database when I get it and it is definitely in the database as initial state still.

Chris Patterson

unread,
Jan 14, 2016, 1:07:44 PM1/14/16
to masstrans...@googlegroups.com
Are you using serializable as your isolation level with the saga repository?

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Ben Poelstra

unread,
Jan 14, 2016, 1:28:38 PM1/14/16
to masstransit-discuss
Yes, I'm not setting it and it looks like it defaults to serializable.


On Thursday, January 14, 2016 at 11:07:44 AM UTC-7, Chris Patterson wrote:
Are you using serializable as your isolation level with the saga repository?

__
Chris Patterson




On Thu, Jan 14, 2016 at 7:59 AM -0800, "Ben Poelstra" <bja...@gmail.com> wrote:

I did try that. It is still doing the same thing. Its not that it is creating a new saga instance, the existing saga instance isn't updating, I think the problem is that its not persisting the state change fast enough. If I put a wait in my consumer for a second it doesn't throw the error anymore. That's why I think its a race condition. I've also tried querying the saga database when I get it and it is definitely in the database as initial state still.

On Thursday, January 14, 2016 at 8:00:10 AM UTC-7, Chris Patterson wrote:
Did you see my pre insert response? That's the key. 

__
Chris Patterson




On Thu, Jan 14, 2016 at 4:02 AM -0800, "Andrew Duff" <andy....@gmail.com> wrote:

We had this same issue in MT2 and moving into MT3. The way we got around it was to create a new extension on EventActivityBinder we called it PublishAfter, what this method will do is to add a entry into a ConcurrentDictionary that uses the CorrelationId as the key and a list of objects that are messages to publish.

Create a wrapper around the SagaRepository implementing the ISagaRepository interface. Marry all the methods up to the proxied instance. The only method we're its not just a direct call to the proxied method is Send. On send call the proxied instance then add code to release all the messages and actually publish to the bus from the ConcurrentDictionary for that saga, you can use context.CorrelationId.Value in the Send method to correlate the correct release.

Ill do a post if I can on how on the weekend if I get time, we verified this with a test that published 100,000 messages all the consumers did was publish the next message immediately. We had zero instances where the message was out of order where as before it was about 20%.


Thanks

Andy


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages