Multiple sagas on a single service bus

793 views
Skip to first unread message

Vinay Manda

unread,
Nov 29, 2011, 10:21:03 AM11/29/11
to masstransit-discuss
I am trying to implement a saga based service for business critical
function.

Instead of handling the errors and restarting during the lifecycle of
saga, I handle the errors and log the failures and let the saga
complete gracefully.
Identifying the critical points of failure I defined other sagas with
the failure points as the starting point for these new sagas. I found
this approach cleaner and easily manageable. (I am very new to the
saga, mass transit; so please excuse if this is a basic design
mistake)

We are using structuremap for DI and RabbitMQ as middleware.

Each time a message is processed I get the below error in the logs,
but the saga continues and the processing happens without any issues.
"Saga already exists and cannot be initiated:
Messages.Payment.InitPaymentProcess"

Can anyone help me identify why this could happen? Is it because I am
using InMemoryRepository? or because I used singleton for the saga? I
tried removing singleton, but I still see the error in the logs.

Injection I used below -

x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).LifecycleIs(new
SingletonLifecycle()).Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));

x.For<PaymentProcessorSaga>().LifecycleIs(new
SingletonLifecycle()).Use<PaymentProcessorSaga>();

Thanks in advance for any help extended.

Vinay Manda

unread,
Nov 29, 2011, 11:44:00 AM11/29/11
to masstransit-discuss
Sorry for the incorrect info. If i remove the
ISagaRepository<PaymentProcessorSaga> as singleton I do not get this
error.

Now, per my understanding, the ISagaRepository would hold all the
instances of saga and hence need to be singleton. This is the same way
it was in the Starbucks example too. Why is the repository trying to
get instantiated each time a start message is consumed?

Dru Sellers

unread,
Nov 29, 2011, 12:02:10 PM11/29/11
to masstrans...@googlegroups.com
Can you share your saga definition, it will play a role in this log issue

-d

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.


Vinay Manda

unread,
Nov 29, 2011, 12:06:28 PM11/29/11
to masstransit-discuss
Thanks for the help extended Dru.

I defined the sagas using the StateMachine

public class PaymentProcessorSaga :
SagaStateMachine<PaymentProcessorSaga>, ISaga

public static State Initial { get; set; }
public static State WaitingForBeginPaymentProcessMessage
{ get; set; }
public static State WaitingForProcessPaymentDataMessage { get;
set; }
public static State WaitingForEndPaymentProcessMessage { get;
set; }
public static State Completed { get; set; }

Similarly other sagas are defined. The initial message is never reused
and all the sagas have different messages.

As I mentioned in my second post, I don't think the error is due to
the multiple sagas, but due to the singleton definition on the
SagaRepository.

Dru Sellers

unread,
Nov 29, 2011, 12:08:32 PM11/29/11
to masstrans...@googlegroups.com
sorry but I need to see the 'Define' so that I can see the Initial and During calls

and you are very welcome

-d

Vinay Manda

unread,
Nov 29, 2011, 12:11:35 PM11/29/11
to masstransit-discuss
Sorry, its below

Define(() =>
{
Initially(
When(InitPaymentProcessMessageReceived)
.Then((saga, message) =>
saga.InitializePaymentProcessing(message))
.TransitionTo(WaitingForBeginPaymentProcessMessage)
);

During(WaitingForBeginPaymentProcessMessage,
When(BeginPaymentProcessMessageReceived)
.Then((saga, message) =>
saga.BeginPaymentProcessing(message))
.TransitionTo(WaitingForProcessPaymentDataMessage)
);

During(WaitingForProcessPaymentDataMessage,
When(ProcessPaymentDataMessageReceived)
.Then((saga, message) =>
saga.ProcessPaymentDataProcessing(message))
.TransitionTo(WaitingForEndPaymentProcessMessage)
);

During(WaitingForEndPaymentProcessMessage,
When(EndPaymentProcessMessageReceived)
.Then((saga, message) => saga.EndPaymentProcessing(message))
.Complete()
);
});

Dru Sellers

unread,
Nov 29, 2011, 12:36:16 PM11/29/11
to masstrans...@googlegroups.com
sorry can you also send the event definitions
-d

Vinay Manda

unread,
Nov 29, 2011, 12:39:53 PM11/29/11
to masstransit-discuss
        public static Event<InitPaymentProcess>
InitPaymentProcessMessageReceived { get; set; }        public static
Event<BeginPaymentProcess> BeginPaymentProcessMessageReceived { get;
set; }        public static Event<ProcessPaymentData>
ProcessPaymentDataMessageReceived { get; set; }        public static
Event<EndPaymentProcess> EndPaymentProcessMessageReceived { get;
set; }
Thank you

Vinay Manda

unread,
Nov 29, 2011, 12:52:57 PM11/29/11
to masstransit-discuss
And the definitions


public void InitializePaymentProcessing(InitPaymentProcess
message)
{

_paymentProcessFacade.SetApplicationSource(message.Source);

this.Bus.Publish(new BeginPaymentProcess
{
CorrelationId =
message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}

public void BeginPaymentProcessing(BeginPaymentProcess
message)
{

this.Bus.Publish(new ProcessPaymentData
{
CorrelationId =
message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}

public void ProcessPaymentDataProcessing(ProcessPaymentData
message)
{
try
{

_paymentProcessFacade.ProcessPayment(message.ReceiptId);
}
catch (Exception ex)
{

_paymentProcessFacade.UpdateProcessStep(message.CorrelationId.ToString(),

message.MessageId.ToString(),

TypeConstants.StepName_ProcessPayment,

TypeConstants.StepStatus_Failed);
}

this.Bus.Publish(new EndPaymentProcess
{
CorrelationId = message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}

public void EndPaymentProcessing(EndPaymentProcess message)
{
// Notify any one about the completion - if required.
}

Dru Sellers

unread,
Nov 29, 2011, 3:31:19 PM11/29/11
to masstrans...@googlegroups.com
Ok, duh - now I am starting to see it.

Saga's are NOT singletons - there should be one for each occurrence - think of them like entities in nhibernate
The repository is a singleton but the saga's are not.

-d


Vinay Manda

unread,
Nov 29, 2011, 3:37:15 PM11/29/11
to masstransit-discuss
Thanks Dru.

So what actually is the Saga? The ISagaRepository or the saga class
which we define?

Having the custom saga class as singelton doesnt cause any issues. But
having the ISagaRepository causes the issue.
Just an FYI, the starbucks example defines them as singleton, you
might want to take care of it.


Also do you see any issues having multiple sagas on the same service
bus?

Can I do SubscribeSaga for multiple sagas on the same IServiceBus?

Thanks for your help. These maybe basic questions, but I guess it
would also be a good documentation for future reference for saga
dummies :)

Dru Sellers

unread,
Nov 29, 2011, 3:53:02 PM11/29/11
to masstrans...@googlegroups.com
Saga - an instance of a 'workflow' or 'long running transaction'

Saga Repository - this stores the sagas - a 'repository' of sagas in the sense of DDD

The saga cannot be a singleton. It would defeat the whole point. And in the MT design the lifecycle of the saga is managed by MT and the ISagaRepository - not the container.

re:starbucks - good catch!

re:multiple sagas in a single bus instance - MT is designed to do just that. Host multiple sagas in one bus instance. 
- yup just call subscribe for each one you want. easy day.

-d

Vinay Manda

unread,
Nov 29, 2011, 4:03:03 PM11/29/11
to masstransit-discuss
Thanks for the detailed explanation.

I understood the same too and hence was my confusion.

Saga - Its a single transaction and hence should not be singleton

Saga Repository - This is a container for all the sagas. Now when this
is mentioned as singleton, I get the error. Should there be only 1
repository which holds all the sagas?

Sorry if I wasn't clear earlier.


On Nov 29, 2:53 pm, Dru Sellers <d...@drusellers.com> wrote:
> Saga - an instance of a 'workflow' or 'long running transaction'http://docs.masstransit-project.com/en/latest/overview/keyideas.html#...


>
> Saga Repository - this stores the sagas - a 'repository' of sagas in the

> sense of DDDhttp://lostechies.com/jimmybogard/2009/09/03/ddd-repository-implement...

Dru Sellers

unread,
Nov 29, 2011, 5:18:28 PM11/29/11
to masstrans...@googlegroups.com
can you point me to where you are seeing saga's registered as singletons?

inline

On Tue, Nov 29, 2011 at 3:03 PM, Vinay Reddy <vinay...@gmail.com> wrote:
Thanks for the detailed explanation.

I understood the same too and hence was my confusion.

Saga - Its a single transaction and hence should not be singleton

correct
 

Saga Repository - This is a container for all the sagas. Now when this
is mentioned as singleton, I get the error. Should there be only 1
repository which holds all the sagas?

correct - only ONE repository
 

Sorry if I wasn't clear earlier.


Its all good. we will get through it. :)

 


On Nov 29, 2:53 pm, Dru Sellers <d...@drusellers.com> wrote:
> Saga - an instance of a 'workflow' or 'long running transaction'http://docs.masstransit-project.com/en/latest/overview/keyideas.html#...
>
> Saga Repository - this stores the sagas - a 'repository' of sagas in the
> sense of DDDhttp://lostechies.com/jimmybogard/2009/09/03/ddd-repository-implement...
>
> The saga cannot be a singleton. It would defeat the whole point. And in the
> MT design the lifecycle of the saga is managed by MT and the
> ISagaRepository - not the container.
>
> re:starbucks - good catch!
>
> re:multiple sagas in a single bus instance - MT is designed to do just
> that. Host multiple sagas in one bus instance.
> - yup just call subscribe for each one you want. easy day.
>
> -d
>

Vinay Manda

unread,
Nov 29, 2011, 5:33:21 PM11/29/11
to masstransit-discuss
In my DI initialize

If I use
container.Configure(x =>
{


x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).LifecycleIs(new
SingletonLifecycle()).Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));
}

then I get the error saying "Saga already exists and cannot be
initiated: "

If I remove declaring this as singleton I do not get this error which
puzzles me cause repository should only be one instance and making it
singleton giving this error.

Thanks!!

On Nov 29, 4:18 pm, Dru Sellers <d...@drusellers.com> wrote:
> can you point me to where you are seeing saga's registered as singletons?
>
> inline
>

Dru Sellers

unread,
Nov 29, 2011, 5:42:36 PM11/29/11
to masstrans...@googlegroups.com
ok - go back to making it a singleton. also you can register it like

x.For(typeof(ISagaRepository<>)).Singleton().Use(typeof(InMemorySagaRepository<>))

and now lets look at how we are correlating the saga.

your ctor should look like

public MySaga(Guid correlationId)
{
    CorrelationId = correlationId;
}


does that ring true?

you can also do something like

Correlate(NewOrder).By((saga, msg) => saga.CorrelationId == msg.CorrelationId);

---

I am now thinking we have duplicate correlation ids


-d

Vinay Manda

unread,
Nov 29, 2011, 6:23:37 PM11/29/11
to masstransit-discuss
Tried .Singleton() instead of .LifecycleIs(new SingletonLifecycle())
and it still gives the same error.

My cotor is also good

public PaymentProcessorSaga(Guid id)
{
this.CorrelationId = id;

_paymentProcessFacade =
ModuleInitializer.container.GetInstance<IPaymentProcessFacade>();
}

I don't see how duplicate corelations can creep in.

Apart from MT, the only other place where saga is called manually is

StateMachineInspector.Trace(new
PaymentProcessorSaga(CombGuid.Generate()));


Thanks,
Vinay


On Nov 29, 4:42 pm, Dru Sellers <d...@drusellers.com> wrote:
> ok - go back to making it a singleton. also you can register it like
>

> x.For(typeof(ISagaRepository<>)).Singleton().Use(typeof(InMemorySagaReposit ory<>))

Dru Sellers

unread,
Nov 29, 2011, 6:38:15 PM11/29/11
to masstrans...@googlegroups.com
do you have a correlate command some where in the saga definition?
or are your messages correlated by Guid?

public class SubmitPaymentMessage :
CorrelatedBy<Guid>
{
public PaymentType PaymentType { get; set; }
public decimal Amount { get; set; }
public Guid CorrelationId { get; set; }
}


-d

Vinay Manda

unread,
Nov 29, 2011, 8:06:58 PM11/29/11
to masstransit-discuss
The messages are correlated by Guid

Example -
public class InitPaymentProcess : CorrelatedBy<Guid>, IPayment
{


public Guid CorrelationId { get; set; }

public Guid MessageId { get; set; }
public String ReceiptId { get; set; }
}

Thanks,
Vinay

Dru Sellers

unread,
Nov 29, 2011, 10:25:12 PM11/29/11
to masstrans...@googlegroups.com
Hmmm. 
@Chris any ideas?

-d

-d

Vinay Manda

unread,
Nov 29, 2011, 11:52:23 PM11/29/11
to masstransit-discuss
Dru, you were right. The saga is begin instantiated with the same
correlation ID twice.

I keep a break point at my const'r

public PaymentProcessorSaga(Guid id)
{
this.CorrelationId = id;
}

When I mention the ISagRepository as singleton, I get the error "Saga
already exists and cannot be initiated: ", then the cotor is hit once.

When its not singleton, I do not get any error, but the cotor is hit
twice with same correlation id.

I am not sure why this is happening, but will dig through more and see
what I might be missing.

Vinay Manda

unread,
Nov 30, 2011, 12:02:22 AM11/30/11
to masstransit-discuss
Stack trace

ERROR 11:00:51 An exception was thrown by a message consumer
MassTransit.Exceptions.MessageException: At least one consumer threw
an exceptio
n ---> MassTransit.Exceptions.SagaException:
Company.PaymentProcessor.Paym
entProcessorSaga(91ebb87f-100b-4f0f-829c-18db177bd9d6) Saga exception
on receipt
of Company.Messages.Payment.InitPaymentProcess: Saga consumer
exception -
--> MassTransit.Exceptions.SagaException:
Company.PaymentProcessor.Payment
ProcessorSaga Saga exception on receipt of
Company.Messages.Payment.InitPa
ymentProcess: Saga already exists and cannot be initiated:
Company.Message
s.Payment.InitPaymentProcess
at MassTransit.Saga.InitiatingSagaPolicy`2.ForExistingSaga(TMessage
message)
in C:\Company\MassTransit\src\MassTransit\Saga
\InitiatingSagaPolicy.cs:lin
e 53
at
MassTransit.Saga.AbstractSagaRepository`1.SendMessageToExistingSagas[TMess
age](IEnumerable`1 existingSagas, ISagaPolicy`2 policy, Action`1
consumerAction,
TMessage message, Action`1 removeAction) in C:\Company\MassTransit\src
\Ma
ssTransit\Saga\AbstractSagaRepository.cs:line 44
--- End of inner exception stack trace ---
at
MassTransit.Saga.AbstractSagaRepository`1.SendMessageToExistingSagas[TMess
age](IEnumerable`1 existingSagas, ISagaPolicy`2 policy, Action`1
consumerAction,
TMessage message, Action`1 removeAction) in C:\Company\MassTransit\src
\Ma
ssTransit\Saga\AbstractSagaRepository.cs:line 63
at MassTransit.Saga.InMemorySagaRepository`1.Send[TMessage]
(Expression`1 filt
er, ISagaPolicy`2 policy, TMessage message, Action`1 consumerAction)
in C:\WebFE
P\OnTime\MassTransit\src\MassTransit\Saga
\InMemorySagaRepository.cs:line 42
at
MassTransit.Saga.Pipeline.SagaMessageSinkBase`2.<Enumerate>b__0(IConsumeCo
ntext`1 context) in C:\Company\MassTransit\src\MassTransit\Saga
\Pipeline\S
agaMessageSinkBase.cs:line 44
at
MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c__DisplayClass2.
<>c__DisplayClass4.<Enumerate>b__1(IConsumeContext x) in C:\Company
\MassTr
ansit\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line
46
at
MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(ICo
nsumeContext context) in C:\Company\MassTransit\src\MassTransit\Context
\Se
rviceBusReceiveContext.cs:line 114
--- End of inner exception stack trace ---
at
MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(ICo
nsumeContext context) in C:\Company\MassTransit\src\MassTransit\Context
\Se
rviceBusReceiveContext.cs:line 130
at
MassTransit.Transports.Endpoint.<>c__DisplayClass5.<>c__DisplayClass7.<Rec
eive>b__4(IReceiveContext receiveContext) in C:\Company\MassTransit\src
\Ma
ssTransit\Transports\Endpoint.cs:line 163

Chris Patterson

unread,
Nov 30, 2011, 7:31:46 AM11/30/11
to masstrans...@googlegroups.com
You are receiving multiple Init messages with the same CorrelationId.

The CorrelationId maps messages to a saga instance. Since the Init message is only handled in Initially, it only can be used to create a new saga instance (and rightfully so, you wouldn't want to initialize the saga twice with the same id).

You are getting two messages at the service with the same id, it's not two saga instances. That's what the exception is telling you - the saga already exists.

Vinay Manda

unread,
Nov 30, 2011, 10:16:19 AM11/30/11
to masstransit-discuss
Thanks Chris.

I am unable to see how this could happen. When the publisher publishes
I see only 1 message in the Queue. I tried it without having a
subscriber running and I see only 1 message.

At this point I do not want to waste the community time on something
which might be internal to my code. Let me debug through and see if I
can find anything and will share the solution/mistake I might find.

Thanks again for all the help extended.

Dru Sellers

unread,
Nov 30, 2011, 10:21:14 AM11/30/11
to masstrans...@googlegroups.com
No problem, please let us know what you find AND/OR anything that we could add to make detecting this easier.
-d

Vinay Manda

unread,
Dec 6, 2011, 10:22:15 AM12/6/11
to masstransit-discuss
I downloaded and am using the latest version of MT now. I see a new
queue queueName_error created and for each request I see a message in
it.

I am still unable to figure out how 2 messages are being received by
the SAGA with same correlation ID. Thought I would share this to see
if it rings a bell for anyone.

The error message in the queue is

{
"destinationAddress": "rabbitmq://localhost/
Company.Messages.Payment:InitPaymentProcess",
"headers": {},
"message": {
"correlationId": "ba92e995-f936-4f02-b768-ba4fabd05d2a",
"messageId": "b9386b4f-8723-4b84-83a2-7da430295f06",
"receiptId": "12312",
"requestTime": "12/6/2011 9:13:38 AM"
},
"messageType": [
"urn:message:Company.Messages.Payment:InitPaymentProcess",
"urn:message:Company.Messages.Payment:IPayment",
"urn:message:Company.Messages:IMessage"
],
"retryCount": 0,
"sourceAddress": "rabbitmq://localhost/OnTimeMessageSink"
}

Chris Patterson

unread,
Dec 6, 2011, 6:03:26 PM12/6/11
to masstrans...@googlegroups.com
If you go into the RabbitMQ management web site, you should be able to see the exchanges. I'm curious how the message name exchanges bind up to the queue -- but you will never get two messages from a single send due to the exchange routing so I'm doubtful that is the issue.

Would you be able to share with me all the code that touches MassTransit in a ZIP file (via private e-mail of course) so that I can look at the entire picture and see where something is misconfigured?

}

Vinay Manda

unread,
Dec 6, 2011, 6:19:50 PM12/6/11
to masstransit-discuss
I checked the exchange below is what it binds to
Message -
Company.Messages.Payment:InitPaymentProcess
To -
Company.Messages.Payment:IPayment
Company.Messages:IMessage
OnTimePayment

I will send you the zip and send across the code chris to your email.
Thanks for the help!

Vinay Manda

unread,
Dec 8, 2011, 3:26:07 PM12/8/11
to masstransit-discuss
I put the publisher on one machine subscriber on other (RabbitMQ is
clustered across both the machines)
If I publish the message, then shut down the publisher; the subscriber
saga is invoked only one(only 1 message is received)If I publish the
message, and keep the publisher(an WCF service) running, the
subscriber saga is invoked twice with the same correlation ID.
All I am doing is

_serviceBus.Publish(message);

Thought I would update the scenario if it rings a bell on why its
happening to anyone.

Thanks,
Vinay

Vinay Manda

unread,
Dec 10, 2011, 6:13:02 PM12/10/11
to masstransit-discuss
Holy Cow!! I found the mistake I was doing which was causing the saga
to be initiated twice for each message.

Below is how I configured my container

container.Configure(x =>
{
x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).Singleton().Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));

x.For<PaymentProcessorSaga>().Use<PaymentProcessorSaga>();
}

The second line was causing it. I just removed the below and it now
works like a charm.
x.For<PaymentProcessorSaga>().Use<PaymentProcessorSaga>();

My habit of mapping all the classes in the container for a project was
very expensive.

I thank all of you who spend time in looking into this and helped me
in chasing down the problem.

Chris Patterson

unread,
Dec 11, 2011, 10:29:47 AM12/11/11
to masstrans...@googlegroups.com
Congrats, glad you were able to find it on your own -- I know I always feel better when that happens.

Dru Sellers

unread,
Dec 12, 2011, 11:19:51 AM12/12/11
to masstrans...@googlegroups.com
CONGRATS!

Reply all
Reply to author
Forward
Message has been deleted
0 new messages