After many hours of headache I've finally tracked the issue I've been facing for quite some time now.
The problem being that my sagas are being started by messages which should NEVER start any sagas. And by definition that is not happening. Well, sort of, apparently.
The following is a code example which illustrates the bug.
The issue is that Combine method doesn't take into account the state during which events may happen. So the outcome is that saga is started by any events defined in During(Initial, ...) AND any events defined in Combine(...).
I have never send ThirdMessage which supposedly initializes SecondSaga yet my SecondSaga was initialized with the same Guid used in FirstSaga.
using Magnum.StateMachine;
using MassTransit;
using MassTransit.Saga;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace MassTransitBug
{
public class Program
{
static void Main(string[] args)
{
var bus = ServiceBusFactory.New(sbc =>
{
sbc.UseRabbitMq();
sbc.UseJsonSerializer();
sbc.ReceiveFrom("rabbitmq://localhost/bugtest");
sbc.DisablePerformanceCounters();
sbc.Subscribe(subs =>
{
subs.Saga<FirstSaga>(new InMemorySagaRepository<FirstSaga>()).Permanent();
subs.Saga<SecondSaga>(new InMemorySagaRepository<SecondSaga>()).Permanent();
subs.Consumer<Consumer>();
});
});
bus.Publish(new FirstMessage()
{
CorrelationId = Guid.NewGuid(),
Text = "what's up?"
});
}
}
public class Consumer : Consumes<SecondMessageRequest>.All
{
public void Consume(SecondMessageRequest request)
{
this.Context().Respond<SecondMessageResponse>(new SecondMessageResponse()
{
CorrelationId = request.CorrelationId,
Response = "ok, " + request.Number
});
}
}
public class FirstMessage : CorrelatedBy<Guid> {
public Guid CorrelationId { get; set; }
public string Text { get; set; }
}
public class SecondMessageRequest : CorrelatedBy<Guid> {
public Guid CorrelationId { get; set; }
public int Number { get; set; }
}
public class SecondMessageResponse : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; set; }
public string Response { get; set; }
}
public class ThirdMessage : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; set; }
}
public class FirstSaga : SagaStateMachine<FirstSaga>, ISaga
{
public Guid CorrelationId { get; set; }
public IServiceBus Bus { get; set; }
public static State Initial { get; set; }
public static State Waiting { get; set; }
public static State Completed { get; set; }
public static Event<FirstMessage> FirstMessageReceived { get; set; }
public static Event<SecondMessageResponse> SecondMessageResponseReceived { get; set; }
public FirstSaga(Guid guid)
{
CorrelationId = guid;
}
static FirstSaga()
{
Define(() =>
{
During(Initial,
When(FirstMessageReceived)
.TransitionTo(Waiting)
.Publish((saga, message) =>
{
return new SecondMessageRequest()
{
CorrelationId = saga.CorrelationId,
Number = 123
};
}));
During(Waiting,
When(SecondMessageResponseReceived)
.Then((saga, response) =>
{
Console.WriteLine(response.Response);
})
.TransitionTo(Completed));
});
}
}
public class SecondSaga : SagaStateMachine<SecondSaga>, ISaga
{
public Guid CorrelationId { get; set; }
public IServiceBus Bus { get; set; }
public static State Initial { get; set; }
public static State Waiting { get; set; }
public static State Completed { get; set; }
public static Event<ThirdMessage> ThirdMessageReceived { get; set; }
public static Event<SecondMessageResponse> SecondMessageResponseReceived { get; set; }
public static Event Done { get; set; }
public int Flags { get; set; }
public SecondSaga(Guid guid)
{
CorrelationId = guid;
if (guid != Guid.Empty)
{
Console.WriteLine("SecondSaga should never be initialized");
}
}
static SecondSaga()
{
Define(() =>
{
During(Initial,
When(ThirdMessageReceived)
.TransitionTo(Waiting));
Combine(SecondMessageResponseReceived).Into(Done, saga => saga.Flags);
During(Waiting,
When(SecondMessageResponseReceived)
.Then(saga => {
}));
});
}
}
}
packages.config file:
<packages>
<package id="Magnum" version="2.1.3" targetFramework="net45" />
<package id="MassTransit" version="2.9.9" targetFramework="net45" />
<package id="MassTransit.RabbitMQ" version="2.9.9" targetFramework="net45" />
<package id="Newtonsoft.Json" version="6.0.6" targetFramework="net45" />
<package id="RabbitMQ.Client" version="3.4.0" targetFramework="net45" />
</packages>