Nuno Lopes
unread,Apr 18, 2011, 11:10:21 AM4/18/11Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to DDD/CQRS
As a followup interesting article of Udi about SAGAS, the intent of
this article is to show that there is more to it then orchestrating
When Xed do Y.
Sagas have been used to solve challenges around performing business
transactions across systems in banking for a long time.
Suppose that we transfer an amount of money from an Account A to
Account B.
Take a simple domain service (as of DDD definition)
void TransferMoney(accountIdFrom, accountIdTo, double amount) {
// transaction start
Account A = AllAccounts.get(accountIdFrom);
Account A = AllAccounts.get(accountIdTo);
A.TransferMoneyTo(amount, accountIdTo);
B.TransferMoneyFrom(amount, accountIdFrom);
// end transaction
}
The code above is simple, but it works on the premiss that we can use
cross Aggregate transactions. Suppose account A and account B are of
two distinct banking systems (two different Banks).
The trick is to only Commit the Transaction when both accounts have
Accept it, other wise Reject both. Rejection should be an idempotent
operation. We can easily do that with Sagas.
BankAccountA {
void TransferMoneyTo(transactionToken, toAccountId, amount) {
if (this.HasPendingTransactions) {
throw new BusinessException("Transactions in process");
}
if (availableBalance-amount < 0) {
throw new BusinessException("...");
}
DomainEvent e = new TransferMoneyToAccepted(....);
Apply(e);
Bus.Publish(e);
}
void Apply(TransferMoneyToAccepted e) {
this.HasPendingTransactions = true;
}
void CommitTransferMoneyTo(transactionToken) {
MoneyTransferToAccepted t = this.transactions.get<
TransferMoneyToAccepted>(transactionToken);
if (t == null) ....
DomainEvent e = new MoneyTransferToCommitted(..., this.balance -
t.amount);
Apply(e);
Bus.Publish(e);
}
void Apply(MoneyTransferToCommitted e) {
this.HasPendingTransactions = false;
this.balance = e.balance;
.....
}
...
}
BankAccountB {
void TransferMoneyFrom(transactionToken, fromAccountId, amount) {
if (this.HasPendingTransactions) {
throw new BusinessException("Transactions in process");
}
// some other business logic
DomainEvent e = new MoneyTransferFromAccepted(....);
Apply(e);
Bus.Publish(e);
}
void Apply(MoneyTransferFromAccepted e) {
this.HasPendingTransactions = true;
}
void CommitTransferMoneyFrom(transactionToken) {
MoneyTransferFromAccepted t =
this.transactions.get<MoneyTransferFromAccepted>(transactionToken);
if (t == null) ....
DomainEvent e = new MoneyTransferToCommitted(transactionToken,
this.balance + t.amount);
Apply(e);
Bus.Publish(e);
}
void Apply(MoneyTransferToCommitted e) {
this.HasPendingTransactions = false;
this.balance = e.balance;
}
....
}
class MoneyTransferSaga : Saga<MoneyTransferSaga>,
StartedBy<MoneyTransferToAccepted>,
StartedBy<MoneyTransferFromAccepted>
{
void Handle(MoneyTransferFromAccepted e)
{
this.moneyTransferFromAccepted = e;
if (this._moneyTransferToPending != null)
{
Bus.Send(new CommitMoneyTransferTo(...);
Bus.Send(new CommitMoneyTransferFrom(...);
this.MarkComplete();
}
}
void Handle(MoneyTransferToAccepted e)
{
this.MoneyTransferToAccepted = e;
if (this._moneyTransferFromPending != null)
{
Bus.Send(new CommitMoneyTransferTo(...);
Bus.Send(new CommitMoneyTransferFrom(...);
this.MarkComplete();
}
}
void override TimeOut()
{
try {
if (this._moneyTransferToPending != null) Bus.Send(new
AbortMoneyTransferTo(...);
if (this._moneyTransferFromPending != null) Bus.Send(new
AbortMoneyTransferFrom(...);
} catch (Exception e)
{
.... register failure launch red flag for manual correction.
}
this.MarkComplete();
}
}
Has you can see by the above code, while there are pending
transactions in an Aggregate no other transactions will be processed.
This might be a problem if the external system takes a long time to
accept the transaction. But this what happens when we have distributed
transactions across multiple systems. Both Aggregates will be locked
until the transactions both transactions are accepted done or
canceled. The transaction token works has a handshake. Here is the
TransferMoney domain service.
string transactionToken = GenerateTransactionToken();
Account A = AllAccounts.getAccount(cmd.fromAccount);
A.TransferMoneyTo(transactionToken, ....);
AllAccounts.Save(A);
ExternalAccountService.TransferMoneyTo(transactionToken,
cmd.fromAccount, cmd.toAccount, .....);
In case of failure due to transactions in process you also implement a
retry scheme in the command handler. Say a transaction fails to
pending transactions in an AR, you can simply re-queue the command
until a number is reached and the command is aborted.
So you can actually implement 2PC with Sagas.
To remove Two PCs, what the banking domain does is more or less the
following. Basically we have availableBalance (money that is available
to be used) and an restrictedBalance for each account(money that even
though is part of the over all balance, it can't be used). The total
balance = availableBalance + restrictedBalance. When we accept the
transaction in account A the money is taken from the available balance
and put in the restricted balance (the total balance does not change).
Business rules always run against the available balance. When we
Commit the transaction, the money is then taken the restricted balance
and that is it. If the transaction is Aborted the money is then taken
from the restricted balance and put in the available balance.
This approach allows for none blocking Transactions across Accounts,
yet the process outlined above is exactly the same in terms of what
the Saga.
We can do the same, for almost everything. For instance say that we
aren't dealing with amounts but a simple value such as a Name. We get
the new String and put in an restricted Name. When the transaction is
committed we take the name out of the restricted Name and put in the
available Name.
Thinks can get much more complicated in which case is probably better
defer application of events. That is, we register the event without
applying it. Only when the transaction is committed we apply all the
events with the same transaction token :)
In the rare cases that the above fails, the history of events are very
important and we need to correct the data manually. Say for instance
the AbortMoneyTransferTo or AbortMoneyTransferFrom fails due to some
outofbound problems (say queues get overloaded).
Hope it helps to understand better Sagas and their role in Distributed
Transactions.
Cheers,
Nuno