MT3 - Basic Saga

99 views
Skip to first unread message

anwar husain

unread,
Nov 25, 2015, 5:41:52 AM11/25/15
to masstransit-discuss
Hi,

I have a basic saga implementation with an in memory repo. My saga starts Ok by subscribing to the 1st message. However it is not subscribing the rest of the messages. I may have bolted the bits wrongly, but can anyone help?

Thanks
Anwar

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using Automatonymous;
using MassTransit.Saga;
namespace MT3
{
    class Program
    {
        static void Main(string[] args)
        {

           start().Wait();
           bus.Stop();
        }
        private static IBusControl bus;
        public static async Task start()
        {
            var machine = new InPatientSaga();

            var repo = new InMemorySagaRepository<InPatientState>();
            

            bus = Bus.Factory.CreateUsingInMemory(cfg =>
            {

                cfg.ReceiveEndpoint("hospital_stay_saga", c =>
                {

                    c.StateMachineSaga(machine, repo);

                });
               

            });

            String episodeNo = "1";
            String patientID = "Patient1";
            Guid workflowID = Guid.NewGuid();

            bus.Start();

            var msg = new PatientAdmittedMsg() { PatientID = patientID, EpisodeNo = episodeNo };

           await bus.Publish(msg);

            var transfer = new PatientTransferredMsg() { 
            PatientID = patientID,
            TransferDate = DateTime.UtcNow
            ,
            EpisodeNo = episodeNo
            };
            
           await bus.Publish(transfer);

            var discharge = new PatientDischargedMsg()
            {
                PatientID = patientID,
                DischargeDate = DateTime.UtcNow
                ,
                EpisodeNo = episodeNo
            };

            await bus.Publish(discharge);
          
            //bus.Stop();

            Console.ReadLine();
        }
    }

    

    public class PatientAdmittedMsg : PatientAdmitted
    {

        public string PatientID
        {
            get;
            set;

        }
        public String EpisodeNo
        {
            get;
            set;
        }
       
    }
    public class PatientTransferredMsg: PatientTransferred
    {

        public string PatientID
        {
            get;
            set;

        }


        public String EpisodeNo
        {
            get;
            set;
        }

        public DateTime TransferDate
        {
            get;
            set;
        }
    }

    public class PatientDischargedMsg : PatientDischarged
    {
        public string PatientID
        {
            get;
            set;
        }
        public DateTime DischargeDate
        {
            get;
            set;
        }

        public String EpisodeNo
        {
            get;
            set;
        }
    }
}


 using Automatonymous;
using System;
using System.Collections.Generic;

namespace MT3
{
    public class InPatientSaga : MassTransitStateMachine<InPatientState>
    {
        public InPatientSaga()
        {
            InstanceState(x => x.CurrentState);
            
            Event(() => PatientAdmitted,
                x => x.CorrelateBy(saga => saga.PatientId, context => context.Message.PatientID)
                .SelectId(saga => Guid.NewGuid())
                 );

            Event(() => PatientTransferred, x => x.CorrelateBy(saga => saga.PatientId, context => context.Message.PatientID));

            Event(() => PatientDischarged, x => x.CorrelateBy(saga => saga.PatientId, context => context.Message.PatientID));

            Initially(
                When(PatientAdmitted)
                    .Then(InitializeSaga)
                    .Then(HandleAdmission)
                    .TransitionTo(Admitted)
                    .Then(c => {
                        Console.WriteLine("The instance statte is : {0}", c.Instance.CurrentState);
                    })

                 );

            During(Admitted,

                When(PatientTransferred)
                    .Then(HandleTransfer)
                    .TransitionTo(Transferred),

                When(PatientDischarged)
                    .Then(HandleDischarge)
                    .TransitionTo(Discharged)
                    .Finalize()
            );

            During(Transferred,
                
                  When(PatientTransferred)
                    .Then(HandleTransfer),
                
                  When(PatientDischarged)
                    .Then(HandleDischarge)
                    .TransitionTo(Discharged)
                    .Finalize()
             );

            DuringAny(

                When(PatientAdmitted)
                    .Then(c => {
                        Console.WriteLine("Admission msg received");
                    }),

                    When(PatientTransferred)
                    .Then(c => {
                        Console.WriteLine("Transfer msg received");
                    }),
                    When(PatientDischarged)
                    .Then(c =>
                    {
                        Console.WriteLine("Discharge msg received");
                    })
                    

             );

                
            SetCompletedWhenFinalized();
        }

        public Event<PatientAdmitted> PatientAdmitted { get; private set; }
        public Event<PatientTransferred> PatientTransferred { get; private set; }
        public Event<PatientDischarged> PatientDischarged { get; private set; }

        public State Admitted { get; private set; }
        public State Transferred { get; private set; }
        public State Discharged { get; private set; }

        public void InitializeSaga(BehaviorContext<InPatientState, PatientAdmitted> context)
        {
            context.Instance.TransferTimes = new List<DateTime>();
            context.Instance.PatientId = context.Data.PatientID;
            context.Instance.EpisodeNo = context.Data.EpisodeNo;
            
            Console.WriteLine("Initializing Instance:PatinetID {0} | Episode No {1}", context.Instance.PatientId, context.Instance.EpisodeNo);
        }

        public void HandleAdmission(BehaviorContext<InPatientState , PatientAdmitted> context){
            System.Console.WriteLine("Patient admission recorded successfully. Total : {0} transfers for Patient {1}", context.Instance.TransferTimes.Count,context.Data.PatientID);
        }

        public void HandleTransfer(BehaviorContext<InPatientState, PatientTransferred> context)
        {
            context.Instance.TransferTimes.Add(context.Data.TransferDate);
            System.Console.WriteLine("Patient transfer recorded successfully. Total : %s transfers", context.Instance.TransferTimes.Count);
        }

        public void HandleDischarge(BehaviorContext<InPatientState, PatientDischarged> context)
        {
           System.Console.WriteLine("Patient discharge recorded successfully. Total : %s transfers", context.Instance.TransferTimes.Count);
        }

    }
}

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MT3
{

  public  interface PatientAdded
    {
        String PatientID { get; }
   
    }

  public interface PatientUpdated
    {
        String PatientID { get; }
    }

    public interface PatientAdmitted
    {
        String PatientID { get; }
        String EpisodeNo { get; }
    }
    public interface PatientTransferred
    {
        String PatientID { get; }
        DateTime TransferDate { get; }
        String EpisodeNo { get; }
    }
    public interface PatientDischarged
    {
        String PatientID { get; }
        DateTime DischargeDate { get; }
        String EpisodeNo { get; }
    }
}




anwar husain

unread,
Nov 25, 2015, 11:18:59 AM11/25/15
to masstransit-discuss
Got that working. I had to use c.UseConcurrencyLimit(1). 
Thanks  to everyone who had a peek anyway.

Chris Patterson

unread,
Nov 25, 2015, 11:26:34 AM11/25/15
to masstransit-discuss

Are you on the latest release (.15)? There were some concurrency issues with .14 if you’re using the in-memory saga repository (which I think you are here) where subsequent messages were not blocked while the initial message was being processed.

 

Also, if you’re having a concurrency issue in testing, you’ll likely want to consider accepting both transfer and discharge in the Initial state, and if the admit is received after either of those messages, just update the admitting information but don’t transition state as the later event has already moved the state beyond the admitted state.

 

ADT is a pretty standard workflow, and using the timestamp in the messages can also help deal with out of order events by checking and taking the appropriate action. Just think of the state machine as a progression, and it’s unlikely that the patient will progress backwards through the care setting. Workflows model the real world, so align them that way.

 

--
Chris Patterson

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/129700bf-97dd-49e0-825e-04a8e8535aba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

 

 

anwar husain

unread,
Nov 25, 2015, 12:00:05 PM11/25/15
to masstransit-discuss
Hi Chris,

Thank you for your response.

Yes I am on .15 ( sorry I didn't mention it in my query). 

Yes even with the concurrency limit set to 1, I am facing the issue described with an in -memory repo. Atleast I know the reason now.

Yes, that would be ideal and we have admission cancellation as well (for user errors :)). 
...
Reply all
Reply to author
Forward
0 new messages