Problems getting my Saga Receiving Events

1,108 views
Skip to first unread message

rmacdonaldsmith

unread,
Oct 10, 2013, 3:25:04 PM10/10/13
to masstrans...@googlegroups.com
Hi,

I have written a saga using the Automatonymous state machine framework. I have written saga's before, using the older MT way of doing things. I have unit tested the saga, using the MT test fixtures, running a local bus, etc... and the tests work and pass. So I think the actual saga implementation is sound. Here is the saga code:

public class UserRegistrationProcessManagerState
        : SagaStateMachineInstance
    {
        public State CurrentState { get; set; }
        public Guid CorrelationId { get; private set; } 
        public IServiceBus Bus { get; set; }
        public Guid EmailUserForMoreInfoCorrelationId { get; set; }

        public UserRegistrationProcessManagerState(Guid correlationId)
        {
            CorrelationId = correlationId;
        }
    }

    public class UserRegistrationProcessManager
        : AutomatonymousStateMachine<UserRegistrationProcessManagerState>
    {
        private static readonly ILog Logger = LogManager.GetLogger(typeof (UserRegistrationProcessManager));
        private readonly UserRegistrationProcessManagerConfiguration _config;

        public UserRegistrationProcessManager(UserRegistrationProcessManagerConfiguration config)
        {
            _config = config;

            State(() => NotVerified);
            State(() => PendingVerification);
            State(() => Active);
            State(() => NoResponse);
            State(() => Denied);

            Event(() => UserRegistrationAutoApproved);
            Event(() => UserVerified);
            Event(() => UserRegistrationNotApproved);
            Event(() => UserQualified);
            Event(() => UserDidNotRespond);
            Event(() => UserDenied);

            Initially(
                When(UserRegistrationAutoApproved)
                    .Do(binder => Logger.DebugFormat("In initial state, accepting UserRegistrationAutoApproved event..."))
                    .TransitionTo(NotVerified),
                When(UserRegistrationNotApproved)
                    .Call((state, evnt) => EmailUserForMoreInformation(state, evnt))
                    .Call((state, evnt) => ScheduleNoResponseMessage(state, evnt))
                    .TransitionTo(PendingVerification)
            );

            During(NotVerified,
                When(UserVerified)
                    .Call((state, evnt) => NotifyUserIsActive(state, evnt))
                    .TransitionTo(Active)
            );

            During(PendingVerification,
                When(UserQualified)
                    .Call((state, _) => CancelNoResponseMessage(state))
                    .TransitionTo(Active),
                When(UserDidNotRespond)
                    .TransitionTo(NoResponse),
                When(UserDenied)
                    .TransitionTo(Denied)
            );
        }

        public State NotVerified            { get; private set; }
        public State PendingVerification    { get; private set; }
        public State Active                 { get; private set; }
        public State NoResponse             { get; private set; }
        public State Denied                 { get; private set; }

        public Event<UserRegistrationAutoApproved> UserRegistrationAutoApproved     { get; private set; }
        public Event<UserRegistrationNotApproved> UserRegistrationNotApproved       { get; private set; }
        public Event<UserVerified> UserVerified                                     { get; private set; }
        public Event<UserActivated> UserQualified                                   { get; private set; }
        public Event<UserDidNotRespond> UserDidNotRespond                           { get; private set; }
        public Event<UserDenied> UserDenied                                         { get; private set; }

        private void EmailUserForMoreInformation(UserRegistrationProcessManagerState state, UserRegistrationNotApproved evnt)
        {
            //send a SendEmail command on the bus
            //Note: in reality we would want some type of factory class to generate the emails
            //the factory would take care of things like formatting, From, CC, etc...
            var email = new SendEmail
                {
                    To = evnt.EmailAddress,
                    FirstName = evnt.FirstName,
                    LastName = evnt.LastName,
                    Subject = "Request for More Information",
                    Body = "You recently attempted to registered on our website. We need more details to complete the registration process..."
                };

            state.Bus.Publish(email);
        }

        private void ScheduleNoResponseMessage(UserRegistrationProcessManagerState state, UserRegistrationNotApproved evnt)
        {
            //schedule a message to be sent back to us indicating that the user did not respond.
            var timesOutAt = _config.RequestMoreInformationFromUserTimeoutSeconds.Seconds().FromNow();
            var noResponseMsg = evnt.BuildNoResponse();
            noResponseMsg.EmailSentOn = SystemTime.Now();
            noResponseMsg.ResponseDeadline = timesOutAt;
            var correlationId = state.Bus.ScheduleMessage(timesOutAt, noResponseMsg).TokenId;
            //WE NEED TO REMEMBER THE CORRELATIONID RETURNED TO US, AND USE IT WHEN WE WANT TO CANCEL THIS SCHEDULED MESSAGE.
            state.EmailUserForMoreInfoCorrelationId = correlationId;
        }

        private void CancelNoResponseMessage(UserRegistrationProcessManagerState state)
        {
            //cancel the scheduled message; we have received a response from the user
            //use the correlationid returned when we scheduled the message
            state.Bus.CancelScheduledMessage(state.EmailUserForMoreInfoCorrelationId);
        }

        private void NotifyUserIsActive(UserRegistrationProcessManagerState state, UserVerified evnt)
        {
            Logger.Debug("Process Manager, Sending email to notify user is active...");
            var email = new SendEmail
            {
                To = evnt.EmailAddress,
                FirstName = evnt.FirstName,
                LastName = evnt.LastName,
                Subject = "Innovadex Account Activation",
                Body = "You recently registered on our website. We are pleased to inform you that your account is now active..."
            };

            state.Bus.Publish(email);
        }
    }

    public class UserRegistrationProcessManagerConfiguration
    {
        public int RequestMoreInformationFromUserTimeoutSeconds { get; set; }
    }

But, for the life of me, I can not get the saga to accept events from the bus when I drop it in to a distributed system. I am obviously doing something wrong and it is probably something quite straight forward to fix. 

I am using the CorrelatedBy<T> interface on all my messages (I would prefer not to, as it means taking a dependency on MT in some of my code where I would like to keep MT out, but I just want to get this working, then I can fix the correlation later). So, I should not have to include any correlation config in the bus subscription (correct me if I am wrong). Here is my subscription:

            var bus = ServiceBusFactory.New(config =>
            {
                //config to load consumers from structuremap and decorate...

                config.Subscribe(subsConfig =>
                {
                    //register process manager (saga)
                    var pmConfig = new UserRegistrationProcessManagerConfiguration
                    {
                        RequestMoreInformationFromUserTimeoutSeconds = 10,
                    };
                    var processManager = new UserRegistrationProcessManager(pmConfig);
                    subsConfig.StateMachineSaga(
                        processManager,
                        new InMemorySagaRepository<UserRegistrationProcessManagerState>());
                });
            });

The saga and command consumers are being hosted in a TopShelf process. I am running commands against the system using BDD style integration tests from SpecFlow. This works for commands / events that do not run through the saga. But I can not get the saga to work with the tests that use this interaction. I am just trying to get the saga to respond to events in the Initail state at the moment.

I can connect a debugger from the MT code base and step in to the InMemorySagaRepository, but if I put a break point / logging in the saga it is not getting called. I can see a "_sagas" collection in the code, and I can see my previous attempts to run tests as there are saga instances in there with correlation Guids.

Can somebody point me in the right direction, tell me what I am doing / not doing to get the saga to receive events and transition state?

Chris Patterson

unread,
Oct 11, 2013, 10:02:59 AM10/11/13
to masstrans...@googlegroups.com
That looks correct (without running it, you said your unit tests pass, so I'm fairly certain you have the syntax right).

And it's being subscribed correctly (I compared to the ones we have in our system and the signatures match, although we have correlations we add because we do not use the CorrelatedBy interface signature). 

The unit test fixtures in MT (and likewise in Automatonymous) are using a real in-memory bus instance, so messages and events are being serialized in that case, so I would tend to rule out serialization issues, but it's worth checking the log file to see if the consumer had an issue with the event being deserialized (again, unlikely, but I'm reaching here).

Also, you can take dependencies on the state machine (we do, we have an activity factory that resolves custom activity implementations from the container, so we can put complex logic in a separate implementation and only depend upon the interface). I should post an example of how to do this sometime.

Are you using MSMQ or RabbitMQ, or are you purely in memory?


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/10063dec-fad0-4771-8e4b-3531467ad3ed%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

rmacdonaldsmith

unread,
Oct 11, 2013, 10:43:27 AM10/11/13
to masstrans...@googlegroups.com
Thanks for taking a look and trawling through my original post!

I am using RabbitMQ. I intend to use a persistent saga repository, but at this stage I am just trying to get the whole system to hang together; later I will replace in-memory repo with something persistent. Likewise, I will probably remove the CorrelatedBy interface from my messages; I did this in an effort to peal back complexity to see what is going on at the saga level, using CorrelatedBy means that I dont have to do any correlation config and, therefore, can not make a mistake!

Since my last post I have turned up the debug level on MT, and I put a Call method in the initial state; I can see that being called. So, I suspect my saga impl and registration are working. Maybe something to do with correlation ID's in my messages that is getting misplaced and the saga repository can not recover my saga instance because the next incoming event has the wrong correlation id - will pursue this for a while...

I would (and I suspect many others would) love to see some example / gist code regarding the activities factory, or maybe a short blog post! I suspect that you guys do some interesting stuff with MT...
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

unread,
Oct 11, 2013, 11:01:13 AM10/11/13
to masstrans...@googlegroups.com
You could setup another queue on RabbitMQ, and bind it to the input exchange for your service. That would save you a copy of every message that is delivered to the bus, so you can inspect it to make sure the CorrelationId of each message that you expect to be related is correct.

Also, if you are correlating without CorrelatedBy, you need to have correlation statements for each event, as well as a SelectId(...) so that the saga is created with the specified id (only for events that are in the initially section of the state machine, since those are the only events that create saga instances).




To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

rmacdonaldsmith

unread,
Oct 11, 2013, 11:26:01 AM10/11/13
to masstrans...@googlegroups.com
Heh - good one RE setting up a queue to view messages - I found my bug! I was not publishing one of the events properly. Knew it was something simple that I had missed, massive thanks for taking the time to read my posts Chris.

To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages