message_id: | e1b80000-5049-c8f7-c7e3-08d77fc098ba | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
delivery_mode: | 2 | ||||||||||||||||
headers: |
|
namespace PatrolOrderSystem{ using System; using System.Threading.Tasks; using Automatonymous; using GreenPipes; using MassTransit; using MTMessageContracts; using OrderStateMachine;
public class PatrolOrderActivity : Activity<PatrolOrderState, IPatrolOrder> { private readonly ConsumeContext _context;
//public PatrolOrderActivity() //{
//}
public PatrolOrderActivity(ConsumeContext context) { _context = context; }
public void Probe(ProbeContext context) { context.CreateScope("patrol-order-received"); }
public void Accept(StateMachineVisitor visitor) { visitor.Visit(this); }
public async Task Execute(BehaviorContext<PatrolOrderState, IPatrolOrder> context, Behavior<PatrolOrderState, IPatrolOrder> next) { await Console.Out.WriteLineAsync($"Received PatrolOrder: {context.Data.OrderNo}");
// send acknowledgement await _context.Publish<IAcknowledgement>(new { SendersOrderNumber = context.Data.OrderNo, AckNumber = context.Data.OrderNo + "1000", AckText = context.Data.OrderNo + " - acknowledged", OriginalOrder = context.Data, });
foreach (var part in context.Data.Parts) { await _context.Publish<IPartRequest>(new { OriginalOrderId = context.Data.OrderNo, CorrelationId = context.Data.OrderGuid, part.PartCode, part.Quantity, }); }
// call the next activity in the behavior await next.Execute(context).ConfigureAwait(false); }
public async Task Faulted<TException>(BehaviorExceptionContext<PatrolOrderState, IPatrolOrder, TException> context, Behavior<PatrolOrderState, IPatrolOrder> next) where TException : Exception { await next.Faulted(context); } }}
namespace PatrolOrderSystem.OrderStateMachine{ using System; using System.Threading.Tasks; using Automatonymous; using MTMessageContracts;
public class OrderStateMachine : MassTransitStateMachine<PatrolOrderState> { public OrderStateMachine() { InstanceState(x => x.CurrentState, AwaitingStock, ReadyForDispatch, Dispatched);
Event(() => OrderReceived, x => x.CorrelateById(context => context.Message.OrderGuid)); Event(() => StockAvailable, x => x.CorrelateById(context => context.Message.OrderGuid));
Initially( When(OrderReceived) .Activity(x => x.OfType<PatrolOrderActivity>())
.TransitionTo(AwaitingStock), When(StockAvailable) .TransitionTo(ReadyForDispatch));
During( AwaitingStock, When(StockAvailable) .TransitionTo(ReadyForDispatch)); }
public State AwaitingStock { get; private set; }
public State ReadyForDispatch { get; private set; }
public State Dispatched { get; private set; }
public Event<IPatrolOrder> OrderReceived { get; private set; }
public Event<IStockAvailable> StockAvailable { get; private set; } }
}
public static async Task Main() { var orderStatemachine = new OrderStateMachine.OrderStateMachine();
var sagaRepository = new InMemorySagaRepository<PatrolOrderState>();
var bus = Bus.Factory.CreateUsingRabbitMq(sbc => { sbc.Host("localhost"); // hooking up the orderstatemachine sbc.ReceiveEndpoint("patrol_order_state", e => { e.StateMachineSaga(orderStatemachine, sagaRepository); }); });
await bus.StartAsync(); // This is important!
await Task.Run(() => Console.ReadKey());
await bus.StopAsync(); }
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/d54e2c6e-58a7-4bc4-b67f-f3551de37228%40googlegroups.com.
Is there an example of how to wire up Activities with IOC? We use Ninject but I can adapt any IOC sample. I will need to inject a data context for DB access (not Saga Persistence) plus possibly other services
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/73ebd16b-77d2-4fdc-93b3-c05ae54e5105%40googlegroups.com.
I've got it working with SimpleInjector.NInject container seems to have a significantly different implementation than either SimpleInjector or Autofac is this just because it is lagging behind changes to the other containers or is it because NInject just can't do what is required? I can look to get the NInject container to be the same functionality as the others but don't want to even start if I'm just going to be hitting brick walls.
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/a4150228-870e-446d-8f48-590680d72c85%40googlegroups.com.
Initially( When(OrderReceived) .Activity(x => x.OfType<IPatrolOrderReceivedActivity>()) .TransitionTo(AwaitingStock),
static class MassTransitTestExtensions{ public static StateMachineSagaTestHarness<TInstance, TStateMachine> StateMachineSaga<TInstance, TStateMachine>( this BusTestHarness harness, IServiceProvider provider, TStateMachine stateMachine, string queueName = null) where TInstance : class, SagaStateMachineInstance where TStateMachine : SagaStateMachine<TInstance> { if ((object)stateMachine == null) throw new ArgumentNullException(nameof(stateMachine)); InMemorySagaRepository<TInstance> memorySagaRepository = new InMemorySagaRepository<TInstance>(); var scopeProvider = new DependencyInjectionSagaScopeProvider<TInstance>(provider); scopeProvider.AddScopeAction(context => context.GetOrAddPayload(provider.GetRequiredService<IStateMachineActivityFactory>)); var scopeSagaRepository = new TestScopeSagaRepository<TInstance>(memorySagaRepository, scopeProvider); return new StateMachineSagaTestHarness<TInstance, TStateMachine>(harness, (ISagaRepository<TInstance>)scopeSagaRepository, stateMachine, queueName); }}
public class TestScopeSagaRepository<TSaga> : ISagaRepository<TSaga>, IProbeSite, IQuerySagaRepository<TSaga> where TSaga : class, ISaga{ private readonly ISagaRepository<TSaga> _repository; private readonly ISagaScopeProvider<TSaga> _scopeProvider;
public TestScopeSagaRepository( ISagaRepository<TSaga> repository, ISagaScopeProvider<TSaga> scopeProvider) { this._repository = repository; this._scopeProvider = scopeProvider; }
void IProbeSite.Probe(ProbeContext context) { ProbeContext scope = context.CreateScope("scope"); this._scopeProvider.Probe(scope); this._repository.Probe(scope); }
async Task ISagaRepository<TSaga>.Send<T>( ConsumeContext<T> context, ISagaPolicy<TSaga, T> policy, IPipe<SagaConsumeContext<TSaga, T>> next) { using (ISagaScopeContext<T> scope = this._scopeProvider.GetScope<T>(context)) await this._repository.Send<T>(scope.Context, policy, next).ConfigureAwait(false); }
async Task ISagaRepository<TSaga>.SendQuery<T>( SagaQueryConsumeContext<TSaga, T> context, ISagaPolicy<TSaga, T> policy, IPipe<SagaConsumeContext<TSaga, T>> next) { using (ISagaQueryScopeContext<TSaga, T> scope = this._scopeProvider.GetQueryScope<T>(context)) await this._repository.SendQuery<T>(scope.Context, policy, next).ConfigureAwait(false); }
public Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query) { var queryable = _repository as IQuerySagaRepository<TSaga>; return queryable.Find(query); }}
public abstract class MassTransitSagaDIAsyncSpecification<TInstance, TStateMachine> : AsyncSpecification where TInstance : class, SagaStateMachineInstance where TStateMachine : class, SagaStateMachine<TInstance>{ protected IServiceProvider Provider; protected InMemoryTestHarness TestHarness { get; private set; } protected StateMachineSagaTestHarness<TInstance, TStateMachine> Saga; protected TStateMachine StateMachine { get; private set; }
public void InitialiseMassTransitSagaTestHarness(Func<TStateMachine> stateMachineFactory, Action<InMemoryTestHarness> configureTestHarness = null, Action<IServiceCollection> configureDIContainer = null) { if (stateMachineFactory == null) { throw new ArgumentNullException(nameof(stateMachineFactory), "Delegate cannot be null."); }
StateMachine = stateMachineFactory.Invoke();
// Set up the DI container var services = new ServiceCollection(); services.AddTransient<ISagaRepository<TInstance>>(provider => Saga.TestRepository); services.TryAddSingleton<ISagaStateMachineFactory, DependencyInjectionSagaStateMachineFactory>(); services.TryAddSingleton<IStateMachineActivityFactory, DependencyInjectionStateMachineActivityFactory>(); services.AddSingleton<TStateMachine>(provider => StateMachine); services.AddSingleton<SagaStateMachine<TInstance>>(provider => StateMachine); configureDIContainer?.Invoke(services); services.AddMassTransit(cfg => { cfg.AddBus(x => TestHarness.BusControl); });
TestHarness = new InMemoryTestHarness(); configureTestHarness?.Invoke(TestHarness);
Provider = services.BuildServiceProvider(true); Saga = TestHarness.StateMachineSaga<TInstance, TStateMachine>(Provider, StateMachine); }
protected override async Task InitializeAsync() { if (TestHarness == null) { throw new Exception("InitialiseMassTransitSagaTestHarness method must be called in the test constructor."); }
await TestHarness.Start(); await base.InitializeAsync(); }
protected override async Task DisposeAsync() { if (TestHarness != null) { await TestHarness.Stop(); } await base.DisposeAsync(); }}