Instead of handling the errors and restarting during the lifecycle of
saga, I handle the errors and log the failures and let the saga
complete gracefully.
Identifying the critical points of failure I defined other sagas with
the failure points as the starting point for these new sagas. I found
this approach cleaner and easily manageable. (I am very new to the
saga, mass transit; so please excuse if this is a basic design
mistake)
We are using structuremap for DI and RabbitMQ as middleware.
Each time a message is processed I get the below error in the logs,
but the saga continues and the processing happens without any issues.
"Saga already exists and cannot be initiated:
Messages.Payment.InitPaymentProcess"
Can anyone help me identify why this could happen? Is it because I am
using InMemoryRepository? or because I used singleton for the saga? I
tried removing singleton, but I still see the error in the logs.
Injection I used below -
x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).LifecycleIs(new
SingletonLifecycle()).Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));
x.For<PaymentProcessorSaga>().LifecycleIs(new
SingletonLifecycle()).Use<PaymentProcessorSaga>();
Thanks in advance for any help extended.
Now, per my understanding, the ISagaRepository would hold all the
instances of saga and hence need to be singleton. This is the same way
it was in the Starbucks example too. Why is the repository trying to
get instantiated each time a start message is consumed?
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.
I defined the sagas using the StateMachine
public class PaymentProcessorSaga :
SagaStateMachine<PaymentProcessorSaga>, ISaga
public static State Initial { get; set; }
public static State WaitingForBeginPaymentProcessMessage
{ get; set; }
public static State WaitingForProcessPaymentDataMessage { get;
set; }
public static State WaitingForEndPaymentProcessMessage { get;
set; }
public static State Completed { get; set; }
Similarly other sagas are defined. The initial message is never reused
and all the sagas have different messages.
As I mentioned in my second post, I don't think the error is due to
the multiple sagas, but due to the singleton definition on the
SagaRepository.
Define(() =>
{
Initially(
When(InitPaymentProcessMessageReceived)
.Then((saga, message) =>
saga.InitializePaymentProcessing(message))
.TransitionTo(WaitingForBeginPaymentProcessMessage)
);
During(WaitingForBeginPaymentProcessMessage,
When(BeginPaymentProcessMessageReceived)
.Then((saga, message) =>
saga.BeginPaymentProcessing(message))
.TransitionTo(WaitingForProcessPaymentDataMessage)
);
During(WaitingForProcessPaymentDataMessage,
When(ProcessPaymentDataMessageReceived)
.Then((saga, message) =>
saga.ProcessPaymentDataProcessing(message))
.TransitionTo(WaitingForEndPaymentProcessMessage)
);
During(WaitingForEndPaymentProcessMessage,
When(EndPaymentProcessMessageReceived)
.Then((saga, message) => saga.EndPaymentProcessing(message))
.Complete()
);
});
public void InitializePaymentProcessing(InitPaymentProcess
message)
{
_paymentProcessFacade.SetApplicationSource(message.Source);
this.Bus.Publish(new BeginPaymentProcess
{
CorrelationId =
message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}
public void BeginPaymentProcessing(BeginPaymentProcess
message)
{
this.Bus.Publish(new ProcessPaymentData
{
CorrelationId =
message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}
public void ProcessPaymentDataProcessing(ProcessPaymentData
message)
{
try
{
_paymentProcessFacade.ProcessPayment(message.ReceiptId);
}
catch (Exception ex)
{
_paymentProcessFacade.UpdateProcessStep(message.CorrelationId.ToString(),
message.MessageId.ToString(),
TypeConstants.StepName_ProcessPayment,
TypeConstants.StepStatus_Failed);
}
this.Bus.Publish(new EndPaymentProcess
{
CorrelationId = message.CorrelationId,
MessageId = message.MessageId,
ReceiptId = message.ReceiptId
});
}
public void EndPaymentProcessing(EndPaymentProcess message)
{
// Notify any one about the completion - if required.
}
So what actually is the Saga? The ISagaRepository or the saga class
which we define?
Having the custom saga class as singelton doesnt cause any issues. But
having the ISagaRepository causes the issue.
Just an FYI, the starbucks example defines them as singleton, you
might want to take care of it.
Also do you see any issues having multiple sagas on the same service
bus?
Can I do SubscribeSaga for multiple sagas on the same IServiceBus?
Thanks for your help. These maybe basic questions, but I guess it
would also be a good documentation for future reference for saga
dummies :)
I understood the same too and hence was my confusion.
Saga - Its a single transaction and hence should not be singleton
Saga Repository - This is a container for all the sagas. Now when this
is mentioned as singleton, I get the error. Should there be only 1
repository which holds all the sagas?
Sorry if I wasn't clear earlier.
On Nov 29, 2:53 pm, Dru Sellers <d...@drusellers.com> wrote:
> Saga - an instance of a 'workflow' or 'long running transaction'http://docs.masstransit-project.com/en/latest/overview/keyideas.html#...
>
> Saga Repository - this stores the sagas - a 'repository' of sagas in the
> sense of DDDhttp://lostechies.com/jimmybogard/2009/09/03/ddd-repository-implement...
Thanks for the detailed explanation.
I understood the same too and hence was my confusion.
Saga - Its a single transaction and hence should not be singleton
Saga Repository - This is a container for all the sagas. Now when this
is mentioned as singleton, I get the error. Should there be only 1
repository which holds all the sagas?
Sorry if I wasn't clear earlier.
On Nov 29, 2:53 pm, Dru Sellers <d...@drusellers.com> wrote:
> Saga - an instance of a 'workflow' or 'long running transaction'http://docs.masstransit-project.com/en/latest/overview/keyideas.html#...
>> sense of DDDhttp://lostechies.com/jimmybogard/2009/09/03/ddd-repository-implement...
> Saga Repository - this stores the sagas - a 'repository' of sagas in the
>
> The saga cannot be a singleton. It would defeat the whole point. And in the
> MT design the lifecycle of the saga is managed by MT and the
> ISagaRepository - not the container.
>
> re:starbucks - good catch!
>
> re:multiple sagas in a single bus instance - MT is designed to do just
> that. Host multiple sagas in one bus instance.
> - yup just call subscribe for each one you want. easy day.
>
> -d
>
If I use
container.Configure(x =>
{
x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).LifecycleIs(new
SingletonLifecycle()).Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));
}
then I get the error saying "Saga already exists and cannot be
initiated: "
If I remove declaring this as singleton I do not get this error which
puzzles me cause repository should only be one instance and making it
singleton giving this error.
Thanks!!
On Nov 29, 4:18 pm, Dru Sellers <d...@drusellers.com> wrote:
> can you point me to where you are seeing saga's registered as singletons?
>
> inline
>
My cotor is also good
public PaymentProcessorSaga(Guid id)
{
this.CorrelationId = id;
_paymentProcessFacade =
ModuleInitializer.container.GetInstance<IPaymentProcessFacade>();
}
I don't see how duplicate corelations can creep in.
Apart from MT, the only other place where saga is called manually is
StateMachineInspector.Trace(new
PaymentProcessorSaga(CombGuid.Generate()));
Thanks,
Vinay
On Nov 29, 4:42 pm, Dru Sellers <d...@drusellers.com> wrote:
> ok - go back to making it a singleton. also you can register it like
>
> x.For(typeof(ISagaRepository<>)).Singleton().Use(typeof(InMemorySagaReposit ory<>))
Example -
public class InitPaymentProcess : CorrelatedBy<Guid>, IPayment
{
public Guid CorrelationId { get; set; }
public Guid MessageId { get; set; }
public String ReceiptId { get; set; }
}
Thanks,
Vinay
I keep a break point at my const'r
public PaymentProcessorSaga(Guid id)
{
this.CorrelationId = id;
}
When I mention the ISagRepository as singleton, I get the error "Saga
already exists and cannot be initiated: ", then the cotor is hit once.
When its not singleton, I do not get any error, but the cotor is hit
twice with same correlation id.
I am not sure why this is happening, but will dig through more and see
what I might be missing.
ERROR 11:00:51 An exception was thrown by a message consumer
MassTransit.Exceptions.MessageException: At least one consumer threw
an exceptio
n ---> MassTransit.Exceptions.SagaException:
Company.PaymentProcessor.Paym
entProcessorSaga(91ebb87f-100b-4f0f-829c-18db177bd9d6) Saga exception
on receipt
of Company.Messages.Payment.InitPaymentProcess: Saga consumer
exception -
--> MassTransit.Exceptions.SagaException:
Company.PaymentProcessor.Payment
ProcessorSaga Saga exception on receipt of
Company.Messages.Payment.InitPa
ymentProcess: Saga already exists and cannot be initiated:
Company.Message
s.Payment.InitPaymentProcess
at MassTransit.Saga.InitiatingSagaPolicy`2.ForExistingSaga(TMessage
message)
in C:\Company\MassTransit\src\MassTransit\Saga
\InitiatingSagaPolicy.cs:lin
e 53
at
MassTransit.Saga.AbstractSagaRepository`1.SendMessageToExistingSagas[TMess
age](IEnumerable`1 existingSagas, ISagaPolicy`2 policy, Action`1
consumerAction,
TMessage message, Action`1 removeAction) in C:\Company\MassTransit\src
\Ma
ssTransit\Saga\AbstractSagaRepository.cs:line 44
--- End of inner exception stack trace ---
at
MassTransit.Saga.AbstractSagaRepository`1.SendMessageToExistingSagas[TMess
age](IEnumerable`1 existingSagas, ISagaPolicy`2 policy, Action`1
consumerAction,
TMessage message, Action`1 removeAction) in C:\Company\MassTransit\src
\Ma
ssTransit\Saga\AbstractSagaRepository.cs:line 63
at MassTransit.Saga.InMemorySagaRepository`1.Send[TMessage]
(Expression`1 filt
er, ISagaPolicy`2 policy, TMessage message, Action`1 consumerAction)
in C:\WebFE
P\OnTime\MassTransit\src\MassTransit\Saga
\InMemorySagaRepository.cs:line 42
at
MassTransit.Saga.Pipeline.SagaMessageSinkBase`2.<Enumerate>b__0(IConsumeCo
ntext`1 context) in C:\Company\MassTransit\src\MassTransit\Saga
\Pipeline\S
agaMessageSinkBase.cs:line 44
at
MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c__DisplayClass2.
<>c__DisplayClass4.<Enumerate>b__1(IConsumeContext x) in C:\Company
\MassTr
ansit\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line
46
at
MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(ICo
nsumeContext context) in C:\Company\MassTransit\src\MassTransit\Context
\Se
rviceBusReceiveContext.cs:line 114
--- End of inner exception stack trace ---
at
MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(ICo
nsumeContext context) in C:\Company\MassTransit\src\MassTransit\Context
\Se
rviceBusReceiveContext.cs:line 130
at
MassTransit.Transports.Endpoint.<>c__DisplayClass5.<>c__DisplayClass7.<Rec
eive>b__4(IReceiveContext receiveContext) in C:\Company\MassTransit\src
\Ma
ssTransit\Transports\Endpoint.cs:line 163
The CorrelationId maps messages to a saga instance. Since the Init message is only handled in Initially, it only can be used to create a new saga instance (and rightfully so, you wouldn't want to initialize the saga twice with the same id).
You are getting two messages at the service with the same id, it's not two saga instances. That's what the exception is telling you - the saga already exists.
I am unable to see how this could happen. When the publisher publishes
I see only 1 message in the Queue. I tried it without having a
subscriber running and I see only 1 message.
At this point I do not want to waste the community time on something
which might be internal to my code. Let me debug through and see if I
can find anything and will share the solution/mistake I might find.
Thanks again for all the help extended.
I am still unable to figure out how 2 messages are being received by
the SAGA with same correlation ID. Thought I would share this to see
if it rings a bell for anyone.
The error message in the queue is
{
"destinationAddress": "rabbitmq://localhost/
Company.Messages.Payment:InitPaymentProcess",
"headers": {},
"message": {
"correlationId": "ba92e995-f936-4f02-b768-ba4fabd05d2a",
"messageId": "b9386b4f-8723-4b84-83a2-7da430295f06",
"receiptId": "12312",
"requestTime": "12/6/2011 9:13:38 AM"
},
"messageType": [
"urn:message:Company.Messages.Payment:InitPaymentProcess",
"urn:message:Company.Messages.Payment:IPayment",
"urn:message:Company.Messages:IMessage"
],
"retryCount": 0,
"sourceAddress": "rabbitmq://localhost/OnTimeMessageSink"
}
}
I will send you the zip and send across the code chris to your email.
Thanks for the help!
_serviceBus.Publish(message);
Thought I would update the scenario if it rings a bell on why its
happening to anyone.
Thanks,
Vinay
Below is how I configured my container
container.Configure(x =>
{
x.For(typeof(ISagaRepository<PaymentProcessorSaga>)).Singleton().Use(typeof(InMemorySagaRepository<PaymentProcessorSaga>));
x.For<PaymentProcessorSaga>().Use<PaymentProcessorSaga>();
}
The second line was causing it. I just removed the below and it now
works like a charm.
x.For<PaymentProcessorSaga>().Use<PaymentProcessorSaga>();
My habit of mapping all the classes in the container for a project was
very expensive.
I thank all of you who spend time in looking into this and helped me
in chasing down the problem.