No ResponseAddress captured in ReceiveContext in Automatonymous Respond call

272 views
Skip to first unread message

Thomas Tomanek

unread,
Mar 26, 2015, 10:50:40 AM3/26/15
to masstrans...@googlegroups.com
Hi,

is it correct that when calling .Respond in an automatonymous saga there is no ResponseAddress in the receive context, so the caller who is expecting to handle a response never gets it? It looks like if there is no address it just does a Publish (rather than Send), meaning it would be possible for me to handle the response, just not at the same point as the original request was sent out but rather as a separate consumer for the response message.

Cheers,
Tom

Chris Patterson

unread,
Mar 26, 2015, 1:29:58 PM3/26/15
to masstrans...@googlegroups.com
If the request was sent to the saga, there should be a RequestId. The responseAddress should be present, but if it isn't the response is published instead of being sent directly to the endpoint of the response Address.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/f94b392d-e075-4059-9d76-b7f189bb00ec%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Thomas Tomanek

unread,
Mar 27, 2015, 5:07:23 AM3/27/15
to masstrans...@googlegroups.com
Cool I 'll check for a RequestId. The saga is picking up the initiating message as a result of a publish rather than a send, so maybe that's affecting it's ability to respond. Will check it out 

Cheers,
Tom

Thomas Tomanek

unread,
Mar 27, 2015, 5:41:23 AM3/27/15
to masstrans...@googlegroups.com
Ok I've narrowed it down a bit. The responding doesn't seem to work when it happens as the result of a scheduled message. Responding immediately from the saga after the initiating message is fine. Got some code below demonstrating. Is there something else I should be doing or is this a bug?

Thanks in advance


namespace SagaTimeout
{
    using System;
    using Automatonymous;
    using MassTransit;
    using MassTransit.NLogIntegration;
    using MassTransit.QuartzIntegration;
    using MassTransit.Saga;
    using MassTransit.Scheduling;
    using Quartz.Impl;

    class Program
    {
        static void Main(string[] args)
        {
            var scheduler = new StdSchedulerFactory().GetScheduler();

            var bus = ServiceBusFactory.New(c =>
            {
                c.UseRabbitMq(cf =>
                {
                    cf.ConfigureHost(new Uri("rabbitmq://localhost/sagatimeouttest"), configurator =>
                    {
                        configurator.SetUsername("tom");
                        configurator.SetPassword("tom");
                    });
                });

                c.SetConcurrentConsumerLimit(1);
                c.UseNLog();
                c.ReceiveFrom(new Uri("rabbitmq://localhost/sagatimeouttest"));
                c.UseJsonSerializer();
                c.DisablePerformanceCounters();
                c.Subscribe(x => x.Consumer(() => new ScheduleMessageConsumer(scheduler)));
            });

            Common.Logging.LogManager.Adapter = new Common.Logging.Simple.ConsoleOutLoggerFactoryAdapter { Level = Common.Logging.LogLevel.Trace };

            bus.SubscribeStateMachineSaga(new Saga(), new InMemorySagaRepository<SagaState>());

            scheduler.JobFactory = new MassTransitJobFactory(bus);
            scheduler.Start();

            bus.PublishRequest(new FirstMessage(Guid.NewGuid()), c => c.Handle<FirstMessageResponse>(x => Console.WriteLine("Got response")));

            Console.WriteLine("Hit any key to exit");
            Console.ReadLine();
            scheduler.Shutdown();
        }
    }

    class SagaState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; private set; }
        public IServiceBus Bus { get; set; }
        public State CurrentState { get; set; }

        public SagaState(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }

    class Saga : AutomatonymousStateMachine<SagaState>
    {

        public Saga()
        {
            State(() => GotFirstMessage);
            Event(() => FirstMessage);
            Event(() => SecondMessage);

            Initially(
                When(FirstMessage)
                    .Then(s => s.Bus.ScheduleMessage(DateTime.UtcNow.AddSeconds(5), new SecondMessage(s.CorrelationId)))
                    .Then(s => Console.WriteLine("Got first message"))
                    // Responding here works
                    //.Respond(s => new FirstMessageResponse(s.CorrelationId));
                    .TransitionTo(GotFirstMessage));
                    


            During(GotFirstMessage,
                When(SecondMessage)
                // Responding here doesn't work
                .Respond(s => new FirstMessageResponse(s.CorrelationId)));

        }

        public State GotFirstMessage { get; private set; }
        public Event<FirstMessage> FirstMessage { get; private set; }
        public Event<SecondMessage> SecondMessage { get; private set; }
    }


    class FirstMessage : CorrelatedBy<Guid>
    {
        public Guid CorrelationId { get; set; }

        public FirstMessage(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }

    class FirstMessageResponse : CorrelatedBy<Guid>
    {
        public Guid CorrelationId { get; set; }

        public FirstMessageResponse(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }

    class SecondMessage : CorrelatedBy<Guid>
    {
        public Guid CorrelationId { get; set; }

        public SecondMessage(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }

}


 

Chris Patterson

unread,
Mar 27, 2015, 10:13:24 AM3/27/15
to masstrans...@googlegroups.com
In that case you would need to capture the request's ID and responseAddress, and when the scheduled message arrives, add those to the message send context as part of the handler of the scheduled message. You could do this nicely with either a method or an activity class that's called by the state machine as part of an execute.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

Thomas Tomanek

unread,
Mar 27, 2015, 11:28:46 AM3/27/15
to masstrans...@googlegroups.com
ah excellent, thanks for that
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Michel Tol

unread,
Jan 29, 2016, 9:27:46 AM1/29/16
to masstransit-discuss
Hi first of all sorry for the late response.

I'm having the issue as Sadprofessor except that I am not sending my response message after a scheduled message has arrived but I am sending it after a event message has arrived.

@Chris could explain how to capture the requestid and responseAddress when the initial message arrives?

 public class TriggerMessageStateMachine :
        MassTransitStateMachine<TriggerMessageState>
    {
        public TriggerMessageStateMachine()
        {
            InstanceState(x => x.CurrentState);
            Event(() => TriggerMessageReceived, x => x.CorrelateById(p => Guid.NewGuid()));
            Event(() => ValidationResultReceived, x => x.CorrelateById(p => p.Message.WorkFlowCorrelationId));
            Event(() => TriggerMessageResponseReceived, x => x.CorrelateById(p => p.Message.WorkflowCorrelationId));
            Initially(
                When(TriggerMessageReceived)
                .Then(context =>
                {
                    context.Instance.TriggerRequestMessage = context.Data;
                    context.Instance.Created = GetUtcTime();
                })
                .Then(context =>
                {
                    var token = context.Data.Tokens;
                    var tokenType = token.First().TokenType.ToUpperInvariant();

                    switch (tokenType)
                    {
                        case "GST":
                        ...
                        break;
                        case "GST21":
                         ....
                        break;
                        case "MFA":
                        ......
                        break;
                        default:
                            context.Publish(new DefaultRequestValidationCommand
                            {
                                TriggerRequestMessage = context.Data,
                                WorkFlowCorrelationId = context.Instance.CorrelationId
                            });
                            break;
                    }
                })
                .TransitionTo(WaitingValidation)
                );

            During(WaitingValidation,
                When(ValidationResultReceived,
                    filter => filter.Data.Result == OpinionType.Kill || filter.Data.Result == OpinionType.NotOke)
                    .Then(context =>
                    {
                        context.Instance.ValidationResult = context.Data.Result;
                        context.Instance.ValidationUpdated = GetUtcTime();
                        context.Instance.ReturnMessage = context.Data.Reason;

                        context.Respond(new EcoSpace.Shared.EcoClasses.Api.Sensor.ResponseMessage
                        {
                            Message = context.Instance.ReturnMessage,
                            Transaction = context.Instance.TriggerRequestMessage.Transaction,
                            ResponseValue = -99
                        });
                    })
                    .TransitionTo(ClosedState),
                 When(ValidationResultReceived,
                    filter => filter.Data.Result != OpinionType.Kill && filter.Data.Result != OpinionType.NotOke)
                    .Then(context =>
                    {
                        context.Instance.ValidationResult = context.Data.Result;
                        context.Instance.ValidationUpdated = GetUtcTime();
                        context.Instance.ReturnMessage = context.Data.Reason;

                        context.Publish(new TriggerRequestMessage
                        {
                            WorkflowCorrelationId = context.Instance.CorrelationId,
                            Transaction = new Transaction
                            {
                                Counter = context.Instance.TriggerRequestMessage.Transaction.Counter,
                                ExternalTransactionId = context.Instance.TriggerRequestMessage.Transaction.ExternalTransactionId,
                                SensorId = context.Instance.TriggerRequestMessage.Transaction.SensorId,
                                TransactionId = context.Instance.TriggerRequestMessage.Transaction.TransactionId
                            }
                        });
                    })
                    .TransitionTo(WaitingForHubResponse));
            During(WaitingForHubResponse,
                When(TriggerMessageResponseReceived)
                    .Then(context => context.Instance.TriggerMessageUpdated = GetUtcTime())
                    
                    //TODO: fill in the correct properties
                    .Respond(context => new EcoSpace.Shared.EcoClasses.Api.Sensor.ResponseMessage())
                    .Finalize()
                );

            During(ClosedState);
        }

        public State WaitingValidation { get; private set; }
        public State WaitingForHubResponse { get; private set; }
        public State ClosedState { get; private set; }

        public Event<EcoSpace.Shared.EcoClasses.Api.Sensor.TriggerRequestMessage> TriggerMessageReceived { get; private set; }
        public Event<ResultValidationEvent> ValidationResultReceived { get; private set; }
        public Event<ResponseMessage> TriggerMessageResponseReceived { get; private set; }


        public DateTime GetUtcTime()
        {
            return DateTime.UtcNow;
        }

    }

Chris Patterson

unread,
Jan 29, 2016, 4:46:41 PM1/29/16
to masstrans...@googlegroups.com
To be able to respond to a message in a separate message handler (the Respond method is meant to be used with an event that is sent as a request), you would need to capture a couple of values from the headers and restore those in the message sender. You also need to look at the ResponseAddress and use that to get the send endpoint for the response:


I'll likely look at a way to capture the RequestId (which is a header) and send that back with the response, along with the proper conversationId and correlationId in the message headers and add that to the state machine for use with the instance.


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Michel Tol

unread,
Jan 29, 2016, 5:07:32 PM1/29/16
to masstransit-discuss
Hi Chris thanks for info. But I think I am lost ;)

.I am initiating the saga by sending a request (EcoSpace.Shared.EcoClasses.Api.Sensor.TriggerRequestMessage) .
When receiving the request in the saga (initially fase - When(ValidationResultReceived)) I am unable to retrieve the RequestId, ConversationId, and the ResponseAddress out of the context. 
The BehaviorContext does not seem to inherit from BaseConsumeContext.

Am I doing something wrong or did I not understand you correctly?

regards,
Michel
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Jan 29, 2016, 6:04:41 PM1/29/16
to masstrans...@googlegroups.com
Sorry, those values are in the ConsumeContext, which you retrieve from the behavior context. Automatonymous is a standalone project, but uses the payload to work with MassTransit.

So you would do something along the lines of:

ConsumeContext consumeContext;
if(behaviorContext.TryGetPayload(out consumeContext))
{
    behaviorContext.Instance.RequestAddress = consumeContext.RequestAddress;
}

That should get you started!

Chris


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Michel Tol

unread,
Jan 30, 2016, 10:56:02 AM1/30/16
to masstransit-discuss
Sweet!  tnx Chris!

Based on Migrating Automatonymous saga from MT2 to MT3 topic I changed the respond block to this:


                .Then(async context =>
                {
                    context.Instance.TriggerMessageUpdated = GetUtcTime();
                    ConsumeContext consumeContext;
                    if (!context.TryGetPayload(out consumeContext))
                        throw new ContextException("The consume context could not be retrieved.");

                    var endpoint = await consumeContext.GetSendEndpoint(context.Instance.SavedResponseAddress);
                    var msg = new EcoSpace.Shared.EcoClasses.Api.Sensor.ResponseMessage();
                    await endpoint.Send(msg, Pipe.Execute<SendContext>(x => x.RequestId = context.Instance.SavedRequestId));
                })

Any remarks?
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Jan 30, 2016, 2:01:03 PM1/30/16
to masstrans...@googlegroups.com
That should do it. It's noisy as hell from a code perspective but does the job.

__
Chris Patterson




To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages