ConsumeContext in Saga Activity

375 views
Skip to first unread message

Roy C

unread,
Dec 13, 2019, 6:47:43 AM12/13/19
to masstransit-discuss
I'm following the example for creating a Saga Activity but the Activity without a default constructor results in an error:

message_id:e1b80000-5049-c8f7-c7e3-08d77fc098ba
delivery_mode:2
headers:
Content-Type:application/vnd.masstransit+json
MT-Fault-ConsumerType:PatrolOrderSystem.OrderStateMachine.PatrolOrderState
MT-Fault-ExceptionType:System.MissingMethodException
MT-Fault-Message:No parameterless constructor defined for this object.
MT-Fault-MessageType:MTMessageContracts.IPatrolOrder
MT-Fault-StackTrace:at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
at System.Activator.CreateInstance(Type type, Boolean nonPublic, Boolean wrapExceptions)
at Automatonymous.DefaultConstructorStateMachineActivityFactory.GetActivity[TActivity,TInstance,TData](BehaviorContext`2 context)
at Automatonymous.Activities.ContainerFactoryActivity`3.Automatonymous.Activity<TInstance,TData>.Execute(BehaviorContext`2 context, Behavior`2 next)
at Automatonymous.Activities.DataConverterActivity`2.Automatonymous.Activity<TInstance>.Execute[T](BehaviorContext`2 context, Behavior`2 next)
at Automatonymous.Behaviors.ActivityBehavior`1.Automatonymous.Behavior<TInstance>.Execute[T](BehaviorContext`2 context)
MT-Fault-Timestamp:2019-12-13T11:35:50.3712809Z
MT-Host-Assembly:PatrolOrderSystem



If I create a default constructor on the Activity then the activity is executed but I am then unable to access the ConsumeContext that I need for publishing other messages.

The code is below.


I suspect that I am missing some key part or that I am doing things completely wrong :o)

Any assistance would be appreciated.



The Activity:

namespace PatrolOrderSystem
{
    using System;
    using System.Threading.Tasks;
    using Automatonymous;
    using GreenPipes;
    using MassTransit;
    using MTMessageContracts;
    using OrderStateMachine;

    public class PatrolOrderActivity : Activity<PatrolOrderState, IPatrolOrder>
    {
        private readonly ConsumeContext _context;

        //public PatrolOrderActivity()
        //{

        //}

        public PatrolOrderActivity(ConsumeContext context)
        {
            _context = context;
        }

        public void Probe(ProbeContext context)
        {
            context.CreateScope("patrol-order-received");
        }

        public void Accept(StateMachineVisitor visitor)
        {
            visitor.Visit(this);
        }

        public async Task Execute(BehaviorContext<PatrolOrderState, IPatrolOrder> context, Behavior<PatrolOrderState, IPatrolOrder> next)
        {
            await Console.Out.WriteLineAsync($"Received PatrolOrder: {context.Data.OrderNo}");

            // send acknowledgement
            await _context.Publish<IAcknowledgement>(new 
            {
                SendersOrderNumber = context.Data.OrderNo,
                AckNumber = context.Data.OrderNo + "1000",
                AckText = context.Data.OrderNo + " - acknowledged",
                OriginalOrder = context.Data,
            });

            foreach (var part in context.Data.Parts)
            {
                await _context.Publish<IPartRequest>(new
                {
                    OriginalOrderId = context.Data.OrderNo,
                    CorrelationId = context.Data.OrderGuid,
                    part.PartCode,
                    part.Quantity,
                });
            }

            // call the next activity in the behavior
            await next.Execute(context).ConfigureAwait(false);
        }

        public async Task Faulted<TException>(BehaviorExceptionContext<PatrolOrderState, IPatrolOrder, TException> context, Behavior<PatrolOrderState, IPatrolOrder> next)
            where TException : Exception
        {
            await next.Faulted(context);
        }
    }
}

The StateMachine:

namespace PatrolOrderSystem.OrderStateMachine
{
    using System;
    using System.Threading.Tasks;
    using Automatonymous;
    using MTMessageContracts;

    public class OrderStateMachine :
        MassTransitStateMachine<PatrolOrderState>
    {
        public OrderStateMachine()
        {
            InstanceState(x => x.CurrentState, AwaitingStock, ReadyForDispatch, Dispatched);

            Event(() => OrderReceived, x => x.CorrelateById(context => context.Message.OrderGuid));
            Event(() => StockAvailable, x => x.CorrelateById(context => context.Message.OrderGuid));

            Initially(
                When(OrderReceived)
                                        .Activity(x => x.OfType<PatrolOrderActivity>())
                    .TransitionTo(AwaitingStock),
                When(StockAvailable)
                    .TransitionTo(ReadyForDispatch));

            During(
                AwaitingStock,
                When(StockAvailable)
                    .TransitionTo(ReadyForDispatch));
        }

        public State AwaitingStock { get; private set; }

        public State ReadyForDispatch { get; private set; }

        public State Dispatched { get; private set; }

        public Event<IPatrolOrder> OrderReceived { get; private set; }

        public Event<IStockAvailable> StockAvailable { get; private set; }
    }
}


The startup:

       public static async Task Main()
        {
                       var orderStatemachine = new OrderStateMachine.OrderStateMachine();
            var sagaRepository = new InMemorySagaRepository<PatrolOrderState>();

            var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                sbc.Host("localhost");
                
                // hooking up the orderstatemachine
                sbc.ReceiveEndpoint("patrol_order_state", e =>
                    {
                        e.StateMachineSaga(orderStatemachine, sagaRepository);
                    });
            });

            await bus.StartAsync(); // This is important!

            await Task.Run(() => Console.ReadKey());

            await bus.StopAsync();
        }



Chris Patterson

unread,
Dec 14, 2019, 1:33:50 PM12/14/19
to masstrans...@googlegroups.com
You can use the behaviorContext that is passed to the Execute method to get the ConsumeContext. 

var consumeContext = behaviorContext.GetPayload<ConsumeContext>();

When you have a constructor argument, that only works if you have a dependency injection container. The default activity factory only works with default constructors (MassTransit doesn't do dependency injection on its own).


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/d54e2c6e-58a7-4bc4-b67f-f3551de37228%40googlegroups.com.

Roy C

unread,
Dec 16, 2019, 11:19:56 AM12/16/19
to masstransit-discuss
 
That worked fine.

Thank you 

Roy C

unread,
Dec 17, 2019, 6:23:17 AM12/17/19
to masstransit-discuss
Is there an example of how to wire up Activities with IOC?  We use Ninject but I can adapt any IOC sample.  I will need to inject a data context for DB access (not Saga Persistence) plus possibly other services

Chris Patterson

unread,
Dec 17, 2019, 11:58:31 AM12/17/19
to masstrans...@googlegroups.com

On Tue, Dec 17, 2019 at 5:23 AM Roy C <rbca...@gmail.com> wrote:
Is there an example of how to wire up Activities with IOC?  We use Ninject but I can adapt any IOC sample.  I will need to inject a data context for DB access (not Saga Persistence) plus possibly other services

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

Roy C

unread,
Dec 18, 2019, 6:53:05 AM12/18/19
to masstransit-discuss


I've got it working with SimpleInjector.

NInject container seems to have a significantly different implementation than either SimpleInjector or Autofac is this just because it is lagging behind changes to the other containers or is it because NInject just can't do what is required?  I can look to get the NInject container to be the same functionality as the others but don't want to even start if I'm just going to be hitting brick walls.

Chris Patterson

unread,
Dec 18, 2019, 11:57:30 AM12/18/19
to masstrans...@googlegroups.com
Is NInject even updated any longer? I kind of think it's dead, and there hasn't been any updates to it in forever. It's also a legacy supported container in MassTransit (which means it doesn't support registration or any of the other new features that the other containers support).


On Wed, Dec 18, 2019 at 5:53 AM Roy C <rbca...@gmail.com> wrote:


I've got it working with SimpleInjector.

NInject container seems to have a significantly different implementation than either SimpleInjector or Autofac is this just because it is lagging behind changes to the other containers or is it because NInject just can't do what is required?  I can look to get the NInject container to be the same functionality as the others but don't want to even start if I'm just going to be hitting brick walls.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

Roy C

unread,
Dec 20, 2019, 10:36:56 AM12/20/19
to masstransit-discuss
I'll stick with SimpleInjector for the minute.

One further question....


I'm trying to set up unit tests for my Saga which is calling the Activity:

 Initially(
                When(OrderReceived)
                    .Activity(x => x.OfType<IPatrolOrderReceivedActivity>())
                    .TransitionTo(AwaitingStock),

I would like to provide a Mock for IPatrolOrderReceivedActivty so will need to provide an IStateMachineActivityFactory for it to resolve from.  How would I hook this into the StateMachineSagaTestHarness or InMemoryTestHarness?

Thanks for all your help so far.

Mark Dawson

unread,
Dec 23, 2019, 4:43:15 AM12/23/19
to masstransit-discuss
Hi Roy,

I managed to get this working recently, but it involved writing some extensions to the MT test framework as it doesn't appear to be supported currently.
I wrote my extensions to work with the InMemoryTestHarness as this was the most convenient bridging point for me between my application tests (written in xUnit.BDD) and the MassTransit test framework.So far these extensions have only been written with Microsoft DI container:

The first extension wires up the InMemoryTestHarness and the Saga (it is adapted from the extension methods already part of the MT test framework of the same name):
static class MassTransitTestExtensions
{
  public static StateMachineSagaTestHarness<TInstance, TStateMachine> StateMachineSaga<TInstance, TStateMachine>(
    this BusTestHarness harness,
    IServiceProvider provider,
    TStateMachine stateMachine,
    string queueName = null)
    where TInstance : class, SagaStateMachineInstance
    where TStateMachine : SagaStateMachine<TInstance>
  {
    if ((object)stateMachine == null)
      throw new ArgumentNullException(nameof(stateMachine));
    InMemorySagaRepository<TInstance> memorySagaRepository = new InMemorySagaRepository<TInstance>();
    var scopeProvider = new DependencyInjectionSagaScopeProvider<TInstance>(provider);
    scopeProvider.AddScopeAction(context => context.GetOrAddPayload(provider.GetRequiredService<IStateMachineActivityFactory>));
    var scopeSagaRepository = new TestScopeSagaRepository<TInstance>(memorySagaRepository, scopeProvider);
    return new StateMachineSagaTestHarness<TInstance, TStateMachine>(harness, (ISagaRepository<TInstance>)scopeSagaRepository, stateMachine, queueName);
  }
}

The TestScopeSagaRepository used in the above code is a simple wrapper class that ensures the DI container is part of the ConsumeContext when sagas are loaded from the InMemorySagaRepository:

public class TestScopeSagaRepository<TSaga> : ISagaRepository<TSaga>, IProbeSite, IQuerySagaRepository<TSaga>
  where TSaga : class, ISaga
{
  private readonly ISagaRepository<TSaga> _repository;
  private readonly ISagaScopeProvider<TSaga> _scopeProvider;

  public TestScopeSagaRepository(
    ISagaRepository<TSaga> repository,
    ISagaScopeProvider<TSaga> scopeProvider)
  {
    this._repository = repository;
    this._scopeProvider = scopeProvider;
  }

  void IProbeSite.Probe(ProbeContext context)
  {
    ProbeContext scope = context.CreateScope("scope");
    this._scopeProvider.Probe(scope);
    this._repository.Probe(scope);
  }

  async Task ISagaRepository<TSaga>.Send<T>(
    ConsumeContext<T> context,
    ISagaPolicy<TSaga, T> policy,
    IPipe<SagaConsumeContext<TSaga, T>> next)
  {
    using (ISagaScopeContext<T> scope = this._scopeProvider.GetScope<T>(context))
      await this._repository.Send<T>(scope.Context, policy, next).ConfigureAwait(false);
  }

  async Task ISagaRepository<TSaga>.SendQuery<T>(
    SagaQueryConsumeContext<TSaga, T> context,
    ISagaPolicy<TSaga, T> policy,
    IPipe<SagaConsumeContext<TSaga, T>> next)
  {
    using (ISagaQueryScopeContext<TSaga, T> scope = this._scopeProvider.GetQueryScope<T>(context))
      await this._repository.SendQuery<T>(scope.Context, policy, next).ConfigureAwait(false);
  }

  public Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query)
  {
    var queryable = _repository as IQuerySagaRepository<TSaga>;
    return queryable.Find(query);
  }
}

Once you have these two extensions it's simply a case of setting up your DI container and using the StateMachineSaga extension method above to link the state machine under test to the InMemoryTestHarness. I use an abstract generic base class to provide most of the boilerplate plumbing, with action methods to add test-specific configuration to the DI container and InMemoryTestHarness (the base AsyncSpecification is part of the xUnit.BDD framework).
You might ask why this code takes a factory method for the state machine instance as a parameter rather than resolving it from the DI container... the answer is simply to keep the code looking similar to the what was already in the MT test framework.

public abstract class MassTransitSagaDIAsyncSpecification<TInstance, TStateMachine> : AsyncSpecification
  where TInstance : class, SagaStateMachineInstance
  where TStateMachine : class, SagaStateMachine<TInstance>
{
  protected IServiceProvider Provider;
  protected InMemoryTestHarness TestHarness { get; private set; }
  protected StateMachineSagaTestHarness<TInstance, TStateMachine> Saga;
  protected TStateMachine StateMachine { get; private set; }

  public void InitialiseMassTransitSagaTestHarness(Func<TStateMachine> stateMachineFactory, Action<InMemoryTestHarness> configureTestHarness = null, Action<IServiceCollection> configureDIContainer = null)
  {
    if (stateMachineFactory == null)
    {
      throw new ArgumentNullException(nameof(stateMachineFactory), "Delegate cannot be null.");
    }

    StateMachine = stateMachineFactory.Invoke();

    // Set up the DI container
    var services = new ServiceCollection();
    services.AddTransient<ISagaRepository<TInstance>>(provider => Saga.TestRepository);
    services.TryAddSingleton<ISagaStateMachineFactory, DependencyInjectionSagaStateMachineFactory>();
    services.TryAddSingleton<IStateMachineActivityFactory, DependencyInjectionStateMachineActivityFactory>();
    services.AddSingleton<TStateMachine>(provider => StateMachine);
    services.AddSingleton<SagaStateMachine<TInstance>>(provider => StateMachine);
    configureDIContainer?.Invoke(services);
    services.AddMassTransit(cfg =>
    {
      cfg.AddBus(x => TestHarness.BusControl);
    });

    TestHarness = new InMemoryTestHarness();
    configureTestHarness?.Invoke(TestHarness);

    Provider = services.BuildServiceProvider(true);
    Saga = TestHarness.StateMachineSaga<TInstance, TStateMachine>(Provider, StateMachine);
  }

  protected override async Task InitializeAsync()
  {
    if (TestHarness == null)
    {
      throw new Exception("InitialiseMassTransitSagaTestHarness method must be called in the test constructor.");
    }

    await TestHarness.Start();
    await base.InitializeAsync();
  }

  protected override async Task DisposeAsync()
  {
    if (TestHarness != null)
    {
      await TestHarness.Stop();
    }
    await base.DisposeAsync();
  }
}

 Hope this is of use to you. Good luck!

Roy C

unread,
Jan 3, 2020, 10:06:09 AM1/3/20
to masstransit-discuss
Hi Mark

Thanks for this.  With a bit of jiggery pokery I managed to get a version of your code for SimpleInjector/NUnit running and some tests written.
Reply all
Reply to author
Forward
0 new messages