Sagas using Automatonymous and RabbitMQ, how to subscribe remote worker to saga events.

511 views
Skip to first unread message

Jeff Borden

unread,
Oct 26, 2016, 3:43:21 PM10/26/16
to masstransit-discuss
I've been trying for days to find a way to get my remote subscribers to hook into events of my saga.  All of the examples/documentation I've found appear to be outdated, incomplete, or using different technologies than Rabbit.  My biggest unknown is when the saga publishes to a subscriber, how is that consumed by the remote working using MT3?  The closest example using Automtonymous sagas is the Sample-ShoppingWeb project, but from what I can tell it is mostly Quartz without any remote Rabbit subscribers.  

My saga is currently setup like so:
 
public class ProductStateMachine :
       
MassTransitStateMachine<Product>
   
{
       
public IBus Bus { get; set; }

       
public ProductStateMachine()
       
{
           
InstanceState(x => x.CurrentState);

           
Event(() => NewWork,
                x
=> x.CorrelateBy(cart => cart.ProductName, context => context.Message.ProductName)
                   
.SelectId(context => Guid.NewGuid()));

           
Event(() => WorkStarted,
                         x
=> x.CorrelateById(context => context.Message.WorkId));

           
State(() => Active);
           
State(() => Ordered);

           
Initially(
               
When(NewWork)
                 
.Publish(context => new WorkStartedMessage
                 
{
                     
CorrelationId = context.Data.CorrelationId
                 
})

                 
// .Then(context => KickWork(context.Data))                             .TransitionTo(Active)
               
);

           
During(Initial,
               
When(WorkStarted).Then(context =>
               
{
                   
Console.Out.WriteLineAsync(
                                         
String.Format("Initial state Work started: {0} to {1}", context.Instance.CorrelationId));

               
}));

           
During(Active,
               
When(WorkStarted)
                   
.Then(context =>
                   
{
                        context
.Instance.ProductName = context.Data.ProductName;
                        context
.Instance.WorkId = context.Data.WorkId;

                       
Console.Out.WriteLineAsync(
                           
String.Format("Active state Work started: {0} to {1}", context.Data.ProductName, context.Instance.CorrelationId));

                   
})
                   
.TransitionTo(Ordered)
               
);

           
SetCompletedWhenFinalized();
       
}


       
public State Active { get; private set; }
       
public State Ordered { get; private set; }
       
public static Event<WorkSubmitted> NewWork { get; set; }
       
public static Event<StartWork> WorkStarted { get; set; }


       
[Serializable]
       
public class WorkStartedMessage : CorrelatedBy<Guid>
       
{
           
public Guid CorrelationId { get; set; }
           
public string ProductName { get; set; }
           
public Guid WorkId { get; set; }
       
}


void KickWork(WorkSubmitted message)
{
     
Console.Out.WriteLineAsync(string.Format("I've received an Work for  {0}.", message.WorkId));
   
var workStartMessage = new WorkStartedMessage
   
{
       
CorrelationId = message.CorrelationId
   
};

   
Console.Out.WriteLineAsync("Starting a long running process...");

   
var rndm = new Random();
   
System.Threading.Thread.Sleep(rndm.Next(1000, 8000));
   
Console.Out.WriteLineAsync("Done!");

   
if (Bus != null)
       
Bus.Publish<StartWork>(workStartMessage);
}}



And have setup my remote worker like:

 
IBusControl _busControl;
           
ProductStateMachine _machine;
           
Lazy<ISagaRepository<ProductTracking.Product>> _repository;
           
           
SagaDbContextFactory sagaDbContextFactory =
               
() => new SagaDbContext<ProductTracking.Product, ProductMap>(Client.SagaDbContextFactoryProvider.ConnectionString);

            _machine
= new ProductStateMachine();

            _repository
= new Lazy<ISagaRepository<ProductTracking.Product>>(
               
() => new EntityFrameworkSagaRepository<ProductTracking.Product>(sagaDbContextFactory));

            _busControl
= Bus.Factory.CreateUsingRabbitMq(x =>
           
{
               
IRabbitMqHost host = x.Host(new Uri(ConfigurationManager.AppSettings["RabbitMQHost"]), h =>
               
{
                    h
.Username("guest");
                    h
.Password("guest");
               
});

                x
.ReceiveEndpoint(host, "Worker_svc", e =>
               
{
                    e
.PrefetchCount = 8;
                    e
.StateMachineSaga(_machine, _repository.Value);
                    e
.Bind<StartWork>();
               
});
           
});

          _busControl
.Start();
           
Console.WriteLine("Bus started.");
           
           
for (; ; )
           
{
               
           
}          
       
}

       
public class ProductMap :
     
SagaClassMapping<ProductTracking.Product>
       
{
           
public ProductMap()
           
{
               
Property(x => x.CurrentState)
               
.HasMaxLength(64);

               
Property(x => x.CorrelationId);
               
Property(x => x.WorkId);
             
               
Property(x => x.ProductName)
                   
.HasMaxLength(256);
           
}
       
}
   
}


I'm getting the feel that I've munged together so many different examples and different stabs at it, that things have become confusing...  Interestingly, if I uncomment the KickWork line in the saga, then both the saga console app and the remote worker will both process the item.  But if I just leave the .Publish line, nothing happens.  Any clarity, ideas, thoughts, advice, recent examples or anything else are greatly appreciated.  Thank you.

Chris Patterson

unread,
Oct 26, 2016, 3:58:18 PM10/26/16
to masstrans...@googlegroups.com
Using sagas (Automatonymous) and routing slips (Courier) are well described in the sample: https://github.com/MassTransit/Sample-Courier

Also, don't use Thread.Sleep in an async method, including event consumers. Always using Task.Delay().

And there should be no reason to .Bind<>() anything, the events in the state machine are automatically bound.



--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/5cf5b868-4982-4ee5-b4e0-86eac86c7619%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages