Hi Guys
I'm having a problem where the my saga persistence is set to optimistic and I am getting lot of DbUpdateConcurrencyException exceptions being thrown. I don't believe this should be the case as I have the partitioner set to only the messages that change this record to be processed one at a time. So the question is does the partitioner work for saga's? Obviously this would occur if I have multiple consumers but I have checked that and I don't.
Here is my saga config
cfg.ReceiveEndpoint(
host,
busSettings.DeployStatusQueueName,
epc =>
{
epc.UseInMemoryOutbox();
var concurrency = 8;
var sagaRepository = context.Resolve<ISagaRepository<DeployStatusSagaState>>();
var saga = context.Resolve<DeployStatusStateMachine>();
epc.PrefetchCount = (ushort)Math.Max(concurrency * 2, 10);
epc.UseConcurrencyLimit(concurrency);
var partitioner = cfg.CreatePartitioner(concurrency);
epc.StateMachineSaga(saga, sagaRepository, sc =>
{
sc.Message<IDeployEmailStatus>(m => m.UsePartitioner(partitioner, p => p.Message.EventId));
sc.Message<IDeploySeedEmailStatus>(m => m.UsePartitioner(partitioner, p => p.Message.EventId));
});
epc.UseRetry(rc => rc.Exponential(10, TimeSpan.FromMilliseconds(500), TimeSpan.FromMinutes(3), TimeSpan.FromMilliseconds(500)));
I was wondering whether my when configuration is allowing multiple messages with the same partition
Extracts from my saga
Event(() => this.CommunicationDeployAdded, x => x.CorrelateById(c => c.Message.EventId).SelectId(c => c.Message.EventId));
Schedule(() => CleanUpTimeout, x => x.ExpirationId, x =>
{
x.Delay = settings.SagaCleanupWaitTime;
x.Received = e => e.CorrelateById(c => c.Message.DeployId);
});
this.During(
this.Active,When(this.CommunicationDeployAdded)
.Then(this.OnEmailStatusReceived)
.Then(c => Logger.Info($"Deploy Status Saga - Updated - Id:
{c.Instance.Id}")),
When(this.CommunicationDeployAdded, c =>
this.CheckIfCommunicationComplete(c)).Then(c => Logger.Debug($"Deploy Status Saga - Deploy Received
last Status - Id: {c.Instance.Id}"))
.Send(
this.busSettings.FinalizeDeploymentQueueAddress,
c => GetFinalizeDeploymentMessage(c.Instance))
.TransitionTo(Completed)
.Schedule(this.CleanUpTimeout, c => new
DeployCleanupExpiredEvent(c.Instance)));
this.During(
new[] { Active, Completed },
When(this.CommunicationDeployAdded, ShouldPublishProgress)
.Publish(this.PublishProgress));
Any suggestions on what to look at or where no configuration is wrong would be awesome as this problem is occuring in production and hence the system is thousands of messages behind because of the constant exception throwing.