I've been trying for days to find a way to get my remote subscribers to hook into events of my saga. All of the examples/documentation I've found appear to be outdated, incomplete, or using different technologies than Rabbit. My biggest unknown is when the saga publishes to a subscriber, how is that consumed by the remote working using MT3? The closest example using Automtonymous sagas is the Sample-ShoppingWeb project, but from what I can tell it is mostly Quartz without any remote Rabbit subscribers.
My saga is currently setup like so:
public class ProductStateMachine :
MassTransitStateMachine<Product>
{
public IBus Bus { get; set; }
public ProductStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => NewWork,
x => x.CorrelateBy(cart => cart.ProductName, context => context.Message.ProductName)
.SelectId(context => Guid.NewGuid()));
Event(() => WorkStarted,
x => x.CorrelateById(context => context.Message.WorkId));
State(() => Active);
State(() => Ordered);
Initially(
When(NewWork)
.Publish(context => new WorkStartedMessage
{
CorrelationId = context.Data.CorrelationId
})
// .Then(context => KickWork(context.Data)) .TransitionTo(Active)
);
During(Initial,
When(WorkStarted).Then(context =>
{
Console.Out.WriteLineAsync(
String.Format("Initial state Work started: {0} to {1}", context.Instance.CorrelationId));
}));
During(Active,
When(WorkStarted)
.Then(context =>
{
context.Instance.ProductName = context.Data.ProductName;
context.Instance.WorkId = context.Data.WorkId;
Console.Out.WriteLineAsync(
String.Format("Active state Work started: {0} to {1}", context.Data.ProductName, context.Instance.CorrelationId));
})
.TransitionTo(Ordered)
);
SetCompletedWhenFinalized();
}
public State Active { get; private set; }
public State Ordered { get; private set; }
public static Event<WorkSubmitted> NewWork { get; set; }
public static Event<StartWork> WorkStarted { get; set; }
[Serializable]
public class WorkStartedMessage : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; set; }
public string ProductName { get; set; }
public Guid WorkId { get; set; }
}
void KickWork(WorkSubmitted message)
{
Console.Out.WriteLineAsync(string.Format("I've received an Work for {0}.", message.WorkId));
var workStartMessage = new WorkStartedMessage
{
CorrelationId = message.CorrelationId
};
Console.Out.WriteLineAsync("Starting a long running process...");
var rndm = new Random();
System.Threading.Thread.Sleep(rndm.Next(1000, 8000));
Console.Out.WriteLineAsync("Done!");
if (Bus != null)
Bus.Publish<StartWork>(workStartMessage);
}}
And have setup my remote worker like:
IBusControl _busControl;
ProductStateMachine _machine;
Lazy<ISagaRepository<ProductTracking.Product>> _repository;
SagaDbContextFactory sagaDbContextFactory =
() => new SagaDbContext<ProductTracking.Product, ProductMap>(Client.SagaDbContextFactoryProvider.ConnectionString);
_machine = new ProductStateMachine();
_repository = new Lazy<ISagaRepository<ProductTracking.Product>>(
() => new EntityFrameworkSagaRepository<ProductTracking.Product>(sagaDbContextFactory));
_busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
IRabbitMqHost host = x.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "Worker_svc", e =>
{
e.PrefetchCount = 8;
e.StateMachineSaga(_machine, _repository.Value);
e.Bind<StartWork>();
});
});
_busControl.Start();
Console.WriteLine("Bus started.");
for (; ; )
{
}
}
public class ProductMap :
SagaClassMapping<ProductTracking.Product>
{
public ProductMap()
{
Property(x => x.CurrentState)
.HasMaxLength(64);
Property(x => x.CorrelationId);
Property(x => x.WorkId);
Property(x => x.ProductName)
.HasMaxLength(256);
}
}
}
I'm getting the feel that I've munged together so many different examples and different stabs at it, that things have become confusing... Interestingly, if I uncomment the KickWork line in the saga, then both the saga console app and the remote worker will both process the item. But if I just leave the .Publish line, nothing happens. Any clarity, ideas, thoughts, advice, recent examples or anything else are greatly appreciated. Thank you.