Does Sagas' work with partitioners?

364 views
Skip to first unread message

Nick Archibald

unread,
Apr 7, 2017, 10:07:09 PM4/7/17
to masstransit-discuss
Hi Guys
I'm having a problem where the my saga persistence is set to optimistic and I am getting lot of DbUpdateConcurrencyException exceptions being thrown.  I don't believe this should be the case as I have the partitioner set to only the messages that change this record to be processed one at a time. So the question is does the partitioner work for saga's? Obviously this would occur if I have multiple consumers but I have checked that and I don't.

Here is my saga config

cfg.ReceiveEndpoint(
host
,
busSettings
.DeployStatusQueueName,
epc
=>
{
epc
.UseInMemoryOutbox();
var concurrency = 8;
var sagaRepository = context.Resolve<ISagaRepository<DeployStatusSagaState>>();
var saga = context.Resolve<DeployStatusStateMachine>();
epc
.PrefetchCount = (ushort)Math.Max(concurrency * 2, 10);
epc
.UseConcurrencyLimit(concurrency);
var partitioner = cfg.CreatePartitioner(concurrency);
epc
.StateMachineSaga(saga, sagaRepository, sc =>
{
sc
.Message<IDeployEmailStatus>(m => m.UsePartitioner(partitioner, p => p.Message.EventId));
sc
.Message<IDeploySeedEmailStatus>(m => m.UsePartitioner(partitioner, p => p.Message.EventId));
                                       
});
                                        epc
.UseRetry(rc => rc.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(3), TimeSpan.FromMilliseconds(500)));




I was wondering whether my when configuration is allowing multiple messages with the same partition

Extracts from my saga
 

           
Event(() => this.CommunicationDeployAdded, x => x.CorrelateById(c => c.Message.EventId).SelectId(c => c.Message.EventId));

 

           
Schedule(() => CleanUpTimeout, x => x.ExpirationId, x =>

           
{

                x
.Delay = settings.SagaCleanupWaitTime;

                x
.Received = e => e.CorrelateById(c => c.Message.DeployId);

           
});






this.During(

this.Active,When(this.CommunicationDeployAdded)
.Then(this.OnEmailStatusReceived)
.Then(c => Logger.Info($"Deploy Status Saga - Updated - Id:


{c.Instance.Id}"
)),
When(this.CommunicationDeployAdded, c =>


this.CheckIfCommunicationComplete(c)).Then(c => Logger.Debug($"Deploy Status Saga - Deploy Received


last Status - Id: {c.Instance.Id}"
))                  


.Send(

this.busSettings.FinalizeDeploymentQueueAddress,                


c
=> GetFinalizeDeploymentMessage(c.Instance))

.TransitionTo(Completed)
.Schedule(this.CleanUpTimeout, c => new
DeployCleanupExpiredEvent(c.Instance)));




this.During(

 

               
new[] { Active, Completed },

         

               
When(this.CommunicationDeployAdded, ShouldPublishProgress)

                   
.Publish(this.PublishProgress));



Any suggestions on what to look at or where no configuration is wrong would be awesome as this problem is occuring in production and hence the system is thousands of messages behind because of the constant exception throwing.


Nick Archibald

unread,
Apr 9, 2017, 7:19:24 AM4/9/17
to masstransit-discuss
OK. So I have done some more testing and changing of configurations even building a simplified Saga to test. From what I am seeing is it looks like the partitioner is not being added to the pipeline for a Saga. If I change the Concurrency limit to 1 it don't get the DbUpdateConcurrencyException thrown (which of course is no surprise). However if I make the partitioner value 1, I will get the exception thrown which I would expect it should work exactly like setting the concurrency limit to 1. I have double checked that the partition filter is correct even going to the point of making my test message all the same Guid for the partition.

Does this sound like a bug? Should I log it? Or have a done a massive fail in configuring my saga with a partitioner?

Chris Patterson

unread,
Apr 9, 2017, 10:54:39 AM4/9/17
to masstrans...@googlegroups.com
So the source is hard to read (a Gist is much cleaner), but let me see if I can get you on the right track...


Does that help? 

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/0a0e076e-63fb-48ab-98df-5790deaf0ce5%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Chris Patterson

unread,
Apr 9, 2017, 11:13:05 AM4/9/17
to masstrans...@googlegroups.com
Also, .CorrelationId is the primary key of the saga, not sure what ".Id" is in your example.

To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

To post to this group, send email to masstransit-discuss@googlegroups.com.

Nick Archibald

unread,
Apr 9, 2017, 8:08:04 PM4/9/17
to masstransit-discuss
Awesome thanks Chirs. Sorry about the code. Thanks for the rough example.

I have updated my very simple Saga that only has one event based off your gist code however I can still see the update exception being thrown even though I except the partitioner to stop this from occurring. To help to see what I am doing I have put the simple test application code here https://github.com/nicka-cme/simple-MassTSaga.git

I also added an message creation Linqpad script.

I hope this helps identify what I am doing wrong.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Apr 17, 2017, 2:04:07 PM4/17/17
to masstrans...@googlegroups.com
FWIW, I changed the develop branch to properly put the partitioner before the saga repository :)


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Nick Archibald

unread,
Apr 18, 2017, 4:08:36 PM4/18/17
to masstransit-discuss
Awesome. Thanks Chris.

Until then anyone who is seeing the problem I was can check out my github repo to see a workaround that Chris help with.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages