Any experience with AMQP for CQRS architectures?

1,560 views
Skip to first unread message

Rinat Abdullin

unread,
Sep 3, 2010, 4:50:28 PM9/3/10
to DDD/CQRS
Hi all,

From reading this discussion group I get the impression that
NServiceBus is the most used message bus and MSMQ is the most used
message queuing transport.

Has anybody tried using something else than MSMQ for CQRS-based
systems? I'm especially interested in Advanced Message Queuing
Protocol (AMQP) implementations (RabbitMQ and Apache Qpid).


Best regards,
Rinat Abdullin

Technology Leader at Lokad.com | Writer at Abdullin.com

Udi Dahan

unread,
Sep 3, 2010, 5:00:18 PM9/3/10
to ddd...@googlegroups.com
Problem with AMQP is that it doesn't support the DTC leading to possible
message loss in case of failures.

Kind regards,

-- Udi Dahan

Scott Reynolds

unread,
Sep 3, 2010, 5:01:30 PM9/3/10
to ddd...@googlegroups.com

It's on my list to use qpid but I probably won't be using nservicebus. I'll use it to publish events to the message store at first then expand its use.

Top priority for me is too figure out how to publish 'business events' within my domain and ill probably use a qpid about queue for that.

It's seems straight forward.

Rinat Abdullin

unread,
Sep 3, 2010, 5:06:06 PM9/3/10
to ddd...@googlegroups.com
True. However DTCs are not requirement for the CQRS (esp. with event sourcing). Besides, they are evil for the systems leveraging cloud computing scalability.

Are there any other possible problems?

Best regards,
Rinat

Udi Dahan

unread,
Sep 3, 2010, 5:25:50 PM9/3/10
to ddd...@googlegroups.com

Ayende put up a post demonstrating this not so long ago:

 

http://ayende.com/Blog/archive/2010/05/29/who-stole-my-transaction.aspx

 

So, you think the DB committed, so you remove the message from the queue, then later the DB rolls back, and your message is effectively lost.

 

I've been in discussions with the Azure team about this, they're aware of the issue, and are working on providing some basic DTC for the cloud.

 

In short, for now, neither CQRS, event-sourcing, or the cloud have any bearing on the matter.

 

Cheers,

 

-- Udi Dahan

Rinat Abdullin

unread,
Sep 3, 2010, 5:26:21 PM9/3/10
to ddd...@googlegroups.com
I'm thinking along the similar lines that the experience should be relatively straightforward. However there are just two things that do not click out-of-the-box for me: 

1) dealing with message handling failures, retries and potential poisons;
2) handling sagas/activities.

If somebody has any experience to share on dealing with these in AMQP scenarios (or just outside MSMQ), it would be much appreciated.

Context: I'm thinking about starting a quick project just for the sake of getting practical experience with .NET+AMQP+CQRS+ES stack.

Best regards,
Rinat

Neil Robbins

unread,
Sep 3, 2010, 5:26:21 PM9/3/10
to ddd...@googlegroups.com
I've spiked using RabbitMQ, can't see why it'd be an issue, I didn't find any. NServiceBus provides an abstraction over queues though making it easier to use them. I saw a RabbitMQ transport for MassTransit a while back (http://github.com/machine/machine.mta), I would have thought that with the changes that were being made (have been made) to make it easier to plug in alternative queues it should be possible to use the RabbitMQ C# client to do the same thing for NServiceBus?

Otherwise RabbitMQ has all you need to get going IMO. That said, a layer above like NServiceBus or MassTransit that gives you things like Saga's OOTB might just make you more productive.

Neil Robbins

unread,
Sep 3, 2010, 5:29:49 PM9/3/10
to ddd...@googlegroups.com
So isn't the problem here using a store which will roll back a commit?

My spike used RabbitMQ to publish events from the changes feed of CouchDB. Once they're on the _changes feed, they're on disk. Even if you weren't using the _changes feed, once you've an ack (eg 201) it's there. Only risk is the machine being physically foobar'd. If that was a real concern, then in a prod setting something like Cloudant's Consistent Hash Table impl. for scaling Couch would prob. do the trick (though I haven't looked at it).

Scott Reynolds

unread,
Sep 3, 2010, 5:30:54 PM9/3/10
to ddd...@googlegroups.com

Qpid, and amqp support 2 phase commit .... the following is from the faq of qpid. https://cwiki.apache.org/qpid/faq.html

Transactional
AMQP supports two types of transactions in AMQP 0-10, TX and DTX. This allows for local (1PC), and 2PC transaction and the ability to coordinate with a TM (Transaction Manager). The Java broker supports TX, the C++ broker support TX, DTX, XA, JTA for fully ACID transactions. This allows you to commit a single unit of work with may contain enqueues & dequeues either locally on the broker, or in coordination with other transactional resource like RDBMS.

On 3 Sep 2010 22:06, "Rinat Abdullin" <rinat.a...@gmail.com> wrote:
True. However DTCs are not requirement for the CQRS (esp. with event sourcing). Besides, they are evil for the systems leveraging cloud computing scalability.

Are there any other possible problems?

Best regards,
Rinat




On Sat, Sep 4, 2010 at 3:00 AM, Udi Dahan <thesoftwa...@gmail.com> wrote:
>

> Problem with...

Rinat Abdullin

unread,
Sep 3, 2010, 5:31:30 PM9/3/10
to ddd...@googlegroups.com
Udi, what about much simpler scenario, when event storage is the actual queue? 2PC problems are completely avoided as long as the underlying persistence provides atomic writes.

We don't even need any transactions here.

Rinat Abdullin

unread,
Sep 3, 2010, 5:34:16 PM9/3/10
to ddd...@googlegroups.com
Yep. I absolutely agree on this one (publishing events after they have been persisted).

Did your spike deal with handler failures and/or sagas?

Neil Robbins

unread,
Sep 3, 2010, 5:39:47 PM9/3/10
to ddd...@googlegroups.com
No, it was a simple little plaything :) Will probably come back to it before long though.


I see it now has NServiceBus integration too, though I haven't tried it. The code should have useful pointers though, such as it's saga related classes.

Rinat Abdullin

unread,
Sep 3, 2010, 5:49:40 PM9/3/10
to ddd...@googlegroups.com
Yes, this project is exactly the thing I've bookmarked earlier today, while Googling for AMQP+CQRS. 

Yet this codebase seems to be really complex, coupled, relational and transactional (probably like with any other .NET ESB that references System.Transactions). I just have this weird gut feeling that everything could and should be much simpler.

Udi Dahan

unread,
Sep 4, 2010, 12:26:38 AM9/4/10
to ddd...@googlegroups.com

Scott,

 

Happy to hear that, however from the RabbitMQ guys I got a different answer, included below:

 

*

-----Original Message-----

From: Michael Bridgen [mailto:mi...@rabbitmq.com]

Sent: Tuesday, July 20, 2010 2:22 PM

To: em...@UdiDahan.com

Cc: rabbitmq...@lists.rabbitmq.com

Subject: Re: [rabbitmq-discuss] Integrating NServiceBus and RabbitMQ

 

Hi Udi,

 

> I've been hearing from many of my clients of the desire to use

> NServiceBus (an open source .NET service bus) on Linux by using Mono.

> 

> The thing missing is that MSMQ (the default transport used by

> NServiceBus) isn't available on Linux (duh :-) )

> 

> 

> 

> In terms of high availability and transactional support, I think that

> RabbitMQ is the best option for enabling the above.

> 

> The thing is if there are certain APIs that should (or should not) be

> used for working with RabbitMQ in "federated mode" - no central broker.

> Also, is there anything special that needs to be done to flow

> transactions from the application level into RabbitMQ and enlisting it

> into distributed transactions?

 

On the surface of things, I too would expect RabbitMQ to be suitable for

backing a message bus; and, RabbitMQ is already used in Mono for messaging:

http://www.mono-project.com/SystemMessaging

 

That may be enough for what you want to do -- I don't know the .NET APIs

well enough to judge.

 

 From your description, though, it looks like NServiceBus's design has

assumptions about the transport that would make your intended use difficult.

 

RabbitMQ is designed to be a central broker, in that its model is fitted

to AMQP, which is a client-broker protocol.  Addresses are scoped to the

broker.

 

The MSMQ model, as I understand it, is that the queues are local

transmission buffers, and addressing is global.

 

It would take some work to implement the MSMQ model with the AMQP model.

 In particular I would guess it can't be accomplished without adapting

the broker (e.g., with a plugin) or at least out-of-band communication

between clients, which would rather undermine the effort.

 

We are looking into models of federation, so I'll keep this use case in

mind.

 

With regard to distributed transactions, RabbitMQ doesn't support them.

 If I recall, they were mentioned but not specified in AMQP 0-8, and

the mention removed in AMQP 0-9-1.  RabbitMQ does support local

transactions, scoped to channels.  But again, I think it would require

modification (and probably substantial development) to support

distributed transactions.

 

 

Regards,

Michael

*

 

-- Udi Dahan

 

From: ddd...@googlegroups.com [mailto:ddd...@googlegroups.com] On Behalf Of Scott Reynolds


Sent: Saturday, September 04, 2010 12:31 AM
To: ddd...@googlegroups.com

Scott Reynolds

unread,
Sep 4, 2010, 2:05:21 AM9/4/10
to ddd...@googlegroups.com

Thanks udi. I wouldn't be using the rabbit mq client thats the difference I guess. There's a qpid c# client. 

Udi Dahan

unread,
Sep 4, 2010, 2:16:48 AM9/4/10
to ddd...@googlegroups.com

Then it sounds like Qpid would be a good candidate for an NServiceBus transport - probably won't be able to get it in to the coming 2.5 version, but the 3.0 timeframe definitely looks doable.

 

Cheers,

Udi Dahan

unread,
Sep 4, 2010, 3:17:37 AM9/4/10
to ddd...@googlegroups.com

As long as the queue that you receive the messages from, and the queue where you're storing the data can be enlisted into the same transaction, then that should be fine - but double-check. Try killing your process mid-way through and see what happens.

 

Cheers,

Rinat Abdullin

unread,
Sep 4, 2010, 3:56:19 AM9/4/10
to ddd...@googlegroups.com
Udi, that's true if you rely on the classical transactional model.

If the process dies, then the worst case scenario is that we'll need
to process the message again. For idempotent operations - we don't
care, for rare non-idempotent operations - we'll have to handle this
in the context.

If somebody is developing cost-effective system for the cloud, he'll
have to handle the second scenario anyway (i.e.: Azure queues can
deliver message more than once). So we can safely forget about
transactions here. This will make code simpler and cheaper, don't you
think?

Best regards,
Rinat

Udi Dahan

unread,
Sep 4, 2010, 4:33:57 AM9/4/10
to ddd...@googlegroups.com
Rinat,

In my work with clients all over the world, I've found that idempotence is something they find *very* difficult. Sure, reads are idempotent, but the complex business logic that runs in some of their most important use cases (and the ones that change the most often) is rarely (if ever) idempotent. Also, the data coming into those use cases tends to be the most valuable to the organization.

If they were to follow your advice, removing transactions and rewriting their business logic to make it idempotent, and then expanding their testing to prove that nothing else happens when a message is processed more than once, I don't think they'd view this effort as "cheaper" or "simpler". I do agree, though, that if they *did* actually do this, there would likely be scalability benefits - but it's been my experience that the scalability improvements they get from CQRS when coupled with proper Bounded Context separation is so massive that they don't need anything above that (unless we're talking about Amazon scale here).

Rinat Abdullin

unread,
Sep 4, 2010, 5:13:35 AM9/4/10
to ddd...@googlegroups.com
Udi,

I haven't said a word about rewriting existing code (at least not
until ROI clearly justifies the change)))

I believe (sorry if I got it wrong) we were talking about the
methodologies of *building new solutions* that make the most out of
the technologies and resources at hand (or directions to evolve
existing and legacy solutions towards, should the business need arise
and be justified by returns on investment).

These are the same methodologies that, say, allow a tiny company with
a few developers to provide services for small eCommerce stores and
large retailers with millions of inventory items alike. Obviously the
latter case would require temporary scaling up from a few cloud VMs to
20-40 for a few hours, although the infrastructure costs would still
be about pennies. Keeping projects simple and lean allows to minimize
development, maintenance and evolution costs as well.

Large established businesses probably don't really care about such
things and pennies anyway. Although theoretically every start-up
company is interested in efficiently building systems that are
cost-effective, flexible and scalable (should the large customer
come). I think that consciously avoiding use of DTC, 2PC and the
entire MSMQ stack might help here. Hence is my interest in learning
about the use of AMQP for the CQRS-based architectures with DDD/ES.


Best regards,
Rinat Abdullin

Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Andrew Cherry

unread,
Sep 4, 2010, 5:59:11 AM9/4/10
to ddd...@googlegroups.com
I've been puzzling over this as well, and coincidentally was planning to spike an AMQP attempt this weekend.

I think it's tricky because I've made the following assumptions (tell me if you think any are invalid)

- Assuming commands are "harder" than events - less likely/possible to be idempotent
- Assuming that ordering is critical (create basket, add item to basket, increase quantity of item makes no sense in any other order on a basket aggregate root)
- The command processor could fail at any time, including between reading a message from a queue and saving the outcome of the command (event store, etc.)
- "Losing" a command in any sense (not processing it, processing it too late) is catastrophic.

Given that, the only safe way I've been able to see of handling commands so far is a transactional read, only completed once the whole operation on the command is complete. If there's a failure, we roll back the transactional read (or let the commit time out) and the message stays on the queue, in the same order it was in. Once we've fixed whatever error was causing the failure, we start processing the queue again.

I've also had to assume that only one process is reading from each queue - that if there needs to be scaling past that, you partition on aggregate root type and then on aggregate root id if necessary. So we might start off by creating a separate queue and command processor for all commands which relate to aggregate root x, and if we need more scale, partition further on ids of x. Even having a competing consumer model for a single queue could result in commands being handled out of order if one consumer tries to read, fails and returns the message to the queue - the other consumer will have moved past that message (given an MSMQ like model of queue iteration).

It seems as if the RabbitMQ model of "read at least once" makes this much harder. It would probably scale much better, but I haven't found a way of layering back guaranteed ordering on top of it.

Events and handling them seem more likely to be trivially idempotent (although I've come up with cases where they're not), but I find that all of the problems I spot can be seen in just handling commands (where you're using messaging to transmit commands anyway).

It may be possible to get around this in some way by turning off re-delivery in your AMQP implementation I suppose. In that case, about the only thing any processor of commands can do is what I'm already doing in the case of MSMQ - stop processing commands until someone has fixed the issue.

(Note - the case of stopping processing should be a real edge case of course - presumably we've tested our code well, and we wouldn't be stopping just because of a business problem, only a system one - business "exceptions" would be handled by some compensating action).

Andy

----- Start Original Message -----
Sent: Sat, 4 Sep 2010 15:13:35 +0600
From: Rinat Abdullin <rinat.a...@gmail.com>

----- End Original Message -----

Rinat Abdullin

unread,
Sep 4, 2010, 6:21:14 AM9/4/10
to ddd...@googlegroups.com
Andrew,

So it looks like there will be at least 2 learning spikes with AMQP in
CQRS/DDD community this weekend. I'd love to hear your thoughts as you
go through!

Regarding the assumptions:

1-2) I agree, command handlers tend to be the place, where
non-idempotent processing is common. I believe that this, depending on
the context, could be compensated by both the domain logic (making
more handlers idempotent) and by sagas (or "activities" as used by Pat
Helland). Same could apply to the ordering.

3) if command handler fails between reading command and saving outcome
events, then nothing bad should happen. We need to ACK message before
it is actually removed from the queue.
4) how can you loose a command (I'm assuming that durable messaging is
used for important ones)?


As for the command ordering and competing consumers. For the beginning
we can try to keep it simple and:

1) batch important commands together as IMessage[].
2) have only one consumer per partition

What do you think?

Best regards,
Rinat Abdullin

Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Udi Dahan

unread,
Sep 4, 2010, 6:53:21 AM9/4/10
to ddd...@googlegroups.com
In terms of scaling out the number of command processing nodes, NServiceBus introduced a message-based load balancer (called the distributor) but it, as well, does not ensure ordering. The strongest recommendation I have for a solution is to use sagas (long-running processes) that introduce time into the business logic thus ensuring that things happen when they need to happen from a business perspective, as opposed to when a command came in (as these two times can be quite different).

Kind regards,

Rinat Abdullin

unread,
Sep 4, 2010, 7:10:24 AM9/4/10
to ddd...@googlegroups.com
Udi, thank you!

I believe this in essence is similar to what you've written in your
latest article (considering time and race problems from the business
perspective), isn't this?

I've been thinking about using activities (as described by Pat Helland
in his "Life Beyond Distributed Transactions: An Apostate's Opinion")
to consciously handle cases where message order and duplication matter
(along with cases where uncertainty in remote AR or process does not
collapse to a good state, or does not collapse at all, timing out).

How close, in your opinion, do "sagas" relate with "activities"? Both
seem to be long-running, aware of the time flow and bound to the AR,
managing it's relations with the outside world, uncertainty and the
time.

Best regards,
Rinat

Andrew Cherry

unread,
Sep 4, 2010, 10:55:00 AM9/4/10
to ddd...@googlegroups.com
Rinat,

Yes, I pretty much agree with your replies - on 1-2, it would be good to have idempotence, but I don't think we can assume it's possible for all domains. I'm going to look more in to activities though. On 3 - yes, hopefully, but if the queueing mechanism resends a message out of order after an ack has not been received, now we have problem. I believe RabbitMQ does this, ordering is no longer guaranteed after a returned message has been resent. 4 - that's my bad writing :) I don't think we can commonly lose a message, but we could lose it from the correct order - which could be just as bad!

In terms of consumers - batching messages I think would be nice, but I think there are definitely going to be situations which require ordering, but the time between commands would preclude batching. I think that the second option, one consumer per partition, is much more likely to be practical.

In an AMQP sense (my spike is only little right now :) it seems like this can be quite nicely achieved using topic exchanges. If we had a standard for routing keys for commands (as an example) like this...

commands.<aggregate_root_type>.<aggregate_id_partitition>

it makes it quite easy for our command handlers to subscribe themselves well. So at the initial scale, where we just have a single consumer for all commands which are for the "basket" aggregate root type, the processor would declare a queue bound to the routing key commands.basket.#, and will therefore only receive all commands for the basket. Within that queue, we can assume ordering (except when there's a failure as we've mentioned!)

If we wanted to scale to more than one consumer, as long as we have a deterministic function for mapping an aggregate id to a partition key (silly example - first char of the guid id!) we could then use routing keys like commands.basket.a, to get all the commands for baskets where the aggregate root id begins with an a. As that's a bounded context, we should still be fine, and can scale that indefinitely.

This hasn't solved the transaction/ordering problem, I'm about to spike some time out/failure cases, but it does show how simple it could be to use some default functionality of AMQP to maybe simplify the scaling of CQRS systems.

Andy


----- Start Original Message -----

Sent: Sat, 4 Sep 2010 16:21:14 +0600

Rinat Abdullin

unread,
Sep 4, 2010, 11:47:25 AM9/4/10
to ddd...@googlegroups.com
Andrew,

From looking at the AMQP specs (Section 4.7 of 0-9-1) it seems that
the ordering is preserved if we do not requeue (i.e.: handler dies
before message Ack). So, true, we can't rely that the order will be
preserved 100% and have to handle this explicitly.

Now, for the shopping basket composition, what if the domain was smart
enough to memorize all incoming commands, processing them in the order
(i.e. timestamp or sequence ID generated by the sender)?

In a more clear logic separation, this is delegated to saga/activity
(ordering and idempotence are it's responsibility), which will
assemble relevant events and execute against the AR. If business
process times out (i.e.: 60 mins) before getting all events in order,
then we've got us a rare exception to handle (perhaps, offer customer
a discount and kindly ask to retry).

As for the almost-infinite scalability - I agree. It seems that AMQP
can handle initial partitioning (and repartitioning) with topic
exchanges. This should work for small and medium scenarios (millions
of ARs and above). And if the company gets too large - then it can
probably afford a dedicated team to build something more robust.
They'll need to handle scenarios, when message needs to chase it's own
partition.

All the best,
Rinat

Andrew Cherry

unread,
Sep 4, 2010, 12:02:21 PM9/4/10
to ddd...@googlegroups.com
Rinat,

I've just had a quick play around with killing things before sensing message Acks. If we only have one consumer per queue (and with the nice partitioning we can do with topic exchanges, I think that could be assumed, as we can scale in better ways with AMQP), then if that consumer dies, it will pick up in the same place it left off. That message won't be requeued, it'll just sit there waiting to be processed and Ack'd. As the only thing that will do that is our dead consumer, that doesn't seem to be a problem (this assumes you've got durable exchange+queue+message set, which seems probable). RabbitMQ at least doesn't seem to do any expiration, so automatic resending by the exchange doesn't seem to happen (not in my experiment so far).

Memorizing all the commands and ordering them might be possible, but perhaps quite hard - that would have a set of durability and "transactional" requirements around the memorization. Definitely possible, but maybe not quite worth it yet.

In terms of partitioning, AMQP is great in that the clients are able to repartition - MSMQ really doesn't work well for this, but dynamic auto-scaling could be done with AMQP and some sensible logic around promoting and removing new queues.

One thing I would say in terms of the saga model - most of these commands in my case are quite atomic - as long as they happen in order, we can somewhat ignore the longer case. One problem I would have with, say, a 60 minute time out, is that presumably we won't have published the events off the back of the commands until, maybe, 59 minutes! Which means our view models updated by events will be very stale. Obviously this won't always be a problem, but if it is a model where a customer expects to see an item in their shopping cart within a couple of seconds, waiting an hour might be a problem :)

I have slightly rethought my view on the topic hierarchy as well

commands.<aggregate root type>.<aggregate root id> requires the publisher of the command to know the aggregate root type - which is perhaps not desirable. We can get the same level of scalability by partitioning purely on the id, although each consumer must now handle all possible commands.

It is less neat perhaps, but also less brittle maybe? What do you think. Logically it would seem nice to be able to see the partitioning of commands by order/basket/customer/account, etc. But does it seem like there is too much coupling between the domain and the commands then? Of course, when sending an "AddToBasket" command, you do kind of know that the AR is a basket - we just don't really express it in code maybe...

Andy

(Sorry for the long reply!)


----- Start Original Message -----

Sent: Sat, 4 Sep 2010 21:47:25 +0600

Greg Young

unread,
Sep 4, 2010, 1:30:06 PM9/4/10
to ddd...@googlegroups.com
hmm wouldn't it make a whole lot of sense to make the event handlers on the read side idempotent instead? I sure hope your clients around the world do not have complex business logic in these.
--
Les erreurs de grammaire et de syntaxe ont été incluses pour m'assurer de votre attention

Rinat Abdullin

unread,
Sep 4, 2010, 8:47:29 PM9/4/10
to ddd...@googlegroups.com
Hi Andrew,

Thank you for the detailed reply!

I got inspired by your progress and just finished a first run of my
spike. So far it looks really nice and relatively simple.

For the AMQP bindings I've been using this pattern:

exchanges:
* events.exchange (topic)
* commands.exchange (topic)

Commands and events are published to these exchanges with the
aggregate Id as the topic.

queues:
* events-partition-x
* commands-partition-x

Right now, for the sake of simplicity there is only a single storage
and processing partition (binding is with "#") Obviously, this could
be tweaked as needed by adding more fine-grained partitioning based on
the ARId. Although repartitioning storage and processing without
bringing the solution down should be fun.

Probably, if fine-grained control is needed for the messages, we could
publish them with "MessageContractNamespaceAndName.AggregateId" and
then play with the wild-card bindings (* and #), but I'd suspect this
might be an overkill and unnecessary increase of complexity.

Each queue get's it's own processor with a single thread, for the
simplicity sake. Processor instance just runs a loop inside .NET TPL
task and owns IModel to avoid threading issues. I haven't tried adding
sagas/activities into the mix, yet.

When persisting ARs, to keep everything simple, I write events into
the store and publish them into the queue. Proper implementation would
just write events and let the dispatcher catch up later.

Since commands are processed only by a single processor per partition,
there should be no concurrency issues at the event store level. Though
just for the fun of it I've used ETags to detect potential concurrency
conflicts (normally retrying command a few moments later should fix
the issue). Same approach is for the views, which are merely objects
serialized into blob with ETags used for the concurrency problem
detection (if somebody somehow manages to update view in the
millisecond between our read and write).

So far there is not a single use of transactions or relational data
storage. Yet solution feels to be really scalable, simple and
logically reliable (aside from the double dispatch of events within
the command handler, that I didn't want to fix in the spike).

Best regards,
Rinat Abdullin

Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Andrew Cherry

unread,
Sep 5, 2010, 7:23:30 AM9/5/10
to ddd...@googlegroups.com
Hi Rinat,

That sounds like you've ended up in quite a similar place to me! I've ended up with two topic exchanges too, and currently have a single processor for each while expecting to partition on the AR id. As you say, as we've now got a single queue per processor (and I've enforced this by declaring the queues as exclusive, so the system will throw errors if you ever have more than one processor per queue) you lose a whole load of concurrency problems. You could almost stop caring about concurrency in event storage. It feels good.

Like you say, scaling it automatically is interesting - i'm going to have a play in that area this afternoon. So far my plan is something like...

Having a fanout exchange called something like system.exchange - every owner of processors and producers of events listens to that. When a new entity joins the system it produces a request to join on that channel. Some receiver picks that up and broadcasts a new schema for partitioning, which all entities then respond to and implement. Something like creating new empty queues with new partitioning first, then starting to listen to them in parallel, then the producers switching to them, then removing the old queues. There's some awkward little bits in there, but this little spike feels more like a play around than essential!

I'm glad we've ended up with similar thoughts - it's reassuring!

:)

Andy


----- Start Original Message -----

Sent: Sun, 5 Sep 2010 06:47:29 +0600

Rinat Abdullin

unread,
Sep 5, 2010, 8:17:02 AM9/5/10
to ddd...@googlegroups.com
Hi Andrew,

That's a nice touch about declaring queues as exclusive. It should
simplify things a lot. Thanks for the idea!

As for the partitioning. I'll probably will try to figure out the
sagas/activities first (since this actually affects how command
failures are handled and moved to the poison).

However my initial thoughts would be to keep repartitioning as
something external to the system, instead of continuous
repartitioning. So basically if we see that there is too much load for
some partitions (as measured by the increased latency in command
processing), external operation is invoked manually (or
semi-automatically). Then it will basically go through the logic that
you've outlined. Although making messages chase their new partitions
and trying to preserving order in the process is something that will
definitely need some additional thought.

* create new queues
* partition.x.inbox
* partition.x.pending
* stop processors running on old partition
* somehow atomically move messages from the old queues to the new ones
(essentially chasing partitions) AND update AMQP bindings
* relocate persistence, if needed
* start processors on the new inboxes

One option is to try following with Helland and making message order
and idempotency to be the responsibility of the partition, rather than
the infrastructure. This should make repartitioning simpler in the
item of "somehow atomically move..." But maybe the industry already
came with some simple and reliable solution to this step.

BTW, you've got an interesting idea about using fanout for the system
communications. I was initially thinking to use XMPP (since it also
allows to have presence to keep track of who's alive), but reusing
AMQP might be more simple (less technologies) and reliable anyway
(restarting partition will reliably pick up missed messages). This
will definitely be worth giving some more thought.

Thanks for sharing experience. This definitely makes spiking more
efficient and fun)))


All the best,
Rinat

Scott Reynolds

unread,
Sep 5, 2010, 8:40:24 AM9/5/10
to ddd...@googlegroups.com

One of the col features in QPID I found (and I presume its generic to most message buses) are some of the features you've listed:

- Queues can be Exclusive. This is awesome for a DR / COB perspective. When one processor dies another can just start listening.
- Queues can be 'fanout' so the all listeners of the queue get guaranteed delivery. A CQRS scenario for this is distributed read-models. I'm looking to use fanout for regional read-models and external integration .... which may even use the same events but thats undecided.
- Standard queue behavior where any 1 subscribers of a queue get notified - this is quite good for generic load balancing of command processing / event processing.

When you talk about partitions below 'partition.x.inbox' I presume your talking about splitting your aggregate roots rather then just load balancing (which can be resolved by the third option above).

Scott

Andrew Cherry

unread,
Sep 5, 2010, 12:43:27 PM9/5/10
to ddd...@googlegroups.com
Yeah, those properties are pretty useful. In terms of partitioning, the way I'm looking at it is that the only always valid partition point is aggregate root identity. Any kind of "standard" load balancing runs the risk of ordering being lost. And it's fairly simple with AMQP to partition in this safe way. I guess in a load balancing analogy with actual hardware balancing of web requests, say, we'd be talking about balancing with affinity - a request from a client (or about an AR) always goes to the same place to be handled. As we've got a good randomisation pattern in our AR keys if we're using Guids, this works quite nicely.

Was thinking a bit more over lunch (my poor girlfriend) - I'm experimenting with "pre-scaling". So we create 16 command processors for example, attach them to 16 queues. They'll each get 1/16th of the commands, in a safe way, if the queues are partitioned by AR identity. To start with, all 16 processors run on one machine - isolated, perhaps a thread per processor, that kind of thing. If we want to scale out, we just move some of those processors on to another machine - 8/8, 5/5/6, etc. as we add machines. We have one machine keeping track of this maybe using a system exchange (machines message when they start/finish processing a queue or error).

If a machine dies, we can send a message to another machine saying "start some new processors for this queue". When a machine is added, it can register, be told which queues to process. It then listens for process end messages sent by the machine/s which currently process those being told to stop processing them. Now we've scaled out. And scaling back in can be quite simple too.

Not that you'd want that fully automated from the start, but even a simple attempt at that could give you some semi-decent failover and recovery. And of course, 16 is just an example. Changing the basic number of partitions would be harder, but presumably you'd be doing that very rarely.

I'm currently trying it with 2 command producers, 16 processors and one DB, all running happily on my desktop box.

Andy

P.S. Erlang already does quite a lot of this, if we think of processors as very cheap, and likely to fail, this actually gets kind of easier!


----- Start Original Message -----

Sent: Sun, 5 Sep 2010 13:40:24 +0100
From: Scott Reynolds <sco...@reynoldsphotography.com>

Scott Reynolds

unread,
Sep 5, 2010, 1:08:38 PM9/5/10
to ddd...@googlegroups.com

On 5 Sep 2010, at 17:43, Andrew Cherry wrote:

> (my poor girlfriend)

You have a girl friend!!!!

Scott Reynolds

unread,
Sep 5, 2010, 1:34:44 PM9/5/10
to ddd...@googlegroups.com
>So we create 16 command processors for example, attach them to 16 queues.

One of the interesting issues I see is that there's some 'code / config change' to start distribute the load when it should be a case of starting up another process. I mean if you wanted to go to 64 queues then the system becomes more complex (etc etc). Just don't lose the value of messaging that the producer isn't tied to the consumer. It would be interesting to see ifs there a better way to handle this concept -- although off the top of my head I can't see it. I mean you example would allow use to have 16 processors pointing at 16 different event store DBs (or more).

I'm not sure this is a real problem.... but I'll have a think.

On 5 Sep 2010, at 17:43, Andrew Cherry wrote:

Andrew Cherry

unread,
Sep 5, 2010, 1:53:00 PM9/5/10
to ddd...@googlegroups.com
Well, there's some config yes, but once done, you'd go from 1 to 16 machines without having to change it. If you started with 64 queues, then you'd go from 1 to 64, etc. but I'm not sure that makes sense - how fast are we expecting to need to scale anyway? Of course, if you wanted it to be absolutely automated, in a "just drop in another machine" way, we could do that - by dynamically scaling the number of queues, but then you changing the number of queues is probably the hardest part - how to do that, with the system online, without risking any loss of data or ordering. It's do-able, but my 1-16 strategy is kind of designed to get someone from small to medium scale easily. Once we're at the point where 16 command processors can't cope (and if each of those is a fairly beefy server, that should be a lot of traffic/usage), I'm assuming we have enough money to try some trickier engineering :)

Andy


----- Start Original Message -----

Sent: Sun, 5 Sep 2010 18:34:44 +0100

Udi Dahan

unread,
Sep 6, 2010, 3:02:11 AM9/6/10
to ddd...@googlegroups.com

Greg,

 

While those event handlers could be made idempotent, the fact is that that is the place where there is the least concern around performance, so having transactions there keeps things very simple. When performance does become an issue, moving to an insert-only model and modifying the queries is often the first (and only) step. Yes, from there it would likely be easier to move to idempotent handling, but (as you've already mentioned) that's not the highest business-value-adding code in the system so it's unlikely that it'll receive very much love and care once it works.

Rinat Abdullin

unread,
Sep 7, 2010, 12:18:51 PM9/7/10
to ddd...@googlegroups.com
Andrew,

Yet another nice idea for pre-partitioning entire entity-set into a
certain number of logical processes and then just bouncing them
between the physical machines. That's something to definitely keep in
mind. Actually ring-based architecture (i.e.: as in consistent
hashing, where servers could be added or removed on-the-fly) might
also simplify the process of dynamically moving these partitions
between the machines without adding too much of the complexity.

BTW, once the AMQP turned out to be working without any problem, I
just swapped the entire RabbitMQ set with ConcurrentQueue{T} from .NET
4.0. This was a trivial thing to do (and it's easy to go back) and
made prototyping spike much simpler to evolve, instantly allowing to
see a few options to further simplify and solidify architecture.


Best regards,
Rinat Abdullin

Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Rinat Abdullin

unread,
Sep 7, 2010, 12:25:57 PM9/7/10
to ddd...@googlegroups.com
Udi, isn't it simpler and cheaper to keep event handlers idempotent
and transaction-less, than go otherwise?

Best regards,
Rinat

Udi Dahan

unread,
Sep 8, 2010, 1:10:32 AM9/8/10
to ddd...@googlegroups.com
Rinat,

It may be cheaper at run-time, but it is likely more costly to develop it this way.

Rinat Abdullin

unread,
Sep 8, 2010, 2:51:06 AM9/8/10
to ddd...@googlegroups.com
Udi,

How expensive could this be to code?

public void Consume(AddionalUserActivatedEvent message)
{
_operations.Update<UserView>(message.UserId, v => v.LoginIdentity =
message.LoginIdentity);
}

or

public void Consume(AdditionalUserCreatedEvent message)
{
_operations.Write(new UserView
{
AccountId = message.AccountId,
UserId = message.UserId,
Username = message.Username,
Email = message.Email,
LoginIdentity = null
});
}

Actually SQL approach costs (time and immense development friction,
should you need to change something) are the reason I'm taking the
effort to migrate from SQL views to cloud-hosted binaries. I still get
atomic updates, but aside from that - unlimited scalability, build-in
read-optimization and opportunities for CDN and replication, REST
access and, finally, cheaper hosting.

Best regards,
Rinat


On Wed, Sep 8, 2010 at 11:10 AM, Udi Dahan

Nuno Lopes

unread,
Sep 12, 2010, 9:38:24 AM9/12/10
to ddd...@googlegroups.com
Generic way to enforce idempotent Command Handlers in CQRS with "Event Sourcing".

Maybe I'm seeing all this wrong but I think using event sourcing and event stores mixed with unique constraints we can make a generic approach to implement idempotent Command handlers.

Here is the what we need to do:

Only one Aggregate method is called per command.

We partition event streams are by Aggregate and store them in an dedicate event store.

Every Command has ID. Need make sure that ID is unique across all message in the event store (we can use a GUID strategy for this).

We put the Command ID along the Entity id in the resulting events.

Enforce an unique constraint on Command Id, Entity ID, Entity Type in the event stream.

We garante that saves on the events store are ATOMIC.

Test: Say we have 2 Command handlers getting the same to process it. The Aggregate several events that will be stored in the event store. The unique constraints above makes sure that only that if two event arrive with the same signature (Command ID, Entity ID, Entity Type) only one will be written. The second will be succeed by default, so will the command.

The problem here is if after successful completion of a Command an email is sent leading to duplicate emails. So we need a way to differentiate between an Command succeeding if one of the resulting events is already stored, and a command succeeding because the resulting events were stored. The first could rise an exception that needs to be caught by the handler and do nothing more (say not send the email). The exception could be "CommandAlreadyProcessed".

Furthermore, the unique constraint could be enforced only on the "latest" events.

Catch my drift? Does this help?

Nuno

Nuno Lopes

unread,
Sep 12, 2010, 10:04:04 AM9/12/10
to ddd...@googlegroups.com
Here is another way without event sourcing.

We sign the Events the same way has I described in my previous email.

Instead of storing all events in the event store, we store only the latest published events ?published?. We enforce unique constraints on the event output decks (rolling decks).

We make sure that publishing events and persisting the aggregate are done within the same transaction (ACID).

The drawback of this approach is the we need to 2PC if the store used to persist the entities and the store used to persist the events published are on different databases.

I think Udi using NServiceBus needs 2PC anyway to enforce that Aggregate persisted only if resulting events are published (ACID).

Nuno

Nuno Lopes

unread,
Sep 12, 2010, 10:16:14 AM9/12/10
to ddd...@googlegroups.com
But the way. The "intelligence" on the algorithm prescribed is basically on how we sign the events and and use them along with unique constraints to guarantee that the out come of the command is the same after one successfully is executed, wether we execute it one or n times.

If the command fails to execute (say due to some business rule change or aggregate state state), the second time we execute it it may succeed.

I think idempotent is a characteristic of successful operations, not failed one. Failed operations can always succeed on a later time.

Nuno


Udi Dahan

unread,
Sep 15, 2010, 2:29:40 PM9/15/10
to ddd...@googlegroups.com
Nuno,

> Only one Aggregate method is called per command.

Yes! I've been beating this drum for a while now.

Kind regards,

-----Original Message-----
From: ddd...@googlegroups.com [mailto:ddd...@googlegroups.com] On Behalf
Of Nuno Lopes
Sent: Sunday, September 12, 2010 2:38 PM
To: ddd...@googlegroups.com
Subject: Re: [DDD/CQRS] Any experience with AMQP for CQRS architectures?

Nuno=
No virus found in this incoming message.
Checked by AVG - www.avg.com
Version: 9.0.851 / Virus Database: 271.1.1/3130 - Release Date: 09/12/10
07:34:00

Rinat Abdullin

unread,
Sep 15, 2010, 2:40:14 PM9/15/10
to ddd...@googlegroups.com
BTW, I've found myself coding AR's methods as "void Do(SomeCommand cmd)"

Not a big deal of change, yet enforces command==AR.method and
completely gets rid of command handlers (which are boring to write
anyway).

Best regards,
Rinat

Jason Diamond

unread,
Sep 15, 2010, 2:48:21 PM9/15/10
to ddd...@googlegroups.com
I'm happy to hear this. The examples I've seen of command handlers just "unpacking" a command and invoking a method seemed silly...

I think this assumes that all commands are addressed to a specific entity so that your service layer infrastructure can retrieve that entity by examining the command, correct? (As stated in Helland's paper...)

-- 
Jason

Rinat Abdullin

unread,
Sep 15, 2010, 3:27:41 PM9/15/10
to ddd...@googlegroups.com
Jason, precisely. Messaging infrastructure is aware of the entity id
(so it can route command to the partition where it resides). And
processes running within the partition know enough about the command
to match it with the entity (based on the contract name from the
header, which is also used to deserialize message).

Just another idea that works well in the current prototype and makes
everything more simple: actual commands (that are passed to the AR) do
not include entity ID. AR does not care about it's own ID (why should
it?). And when we publish resulting events to the wild, it is the
responsibility of the infrastructure to attach aggregate Id, version,
date and other metadata

A simple thing that is implemented in a single place but makes writing
commands and events less boring and repetitive (no ids, versions,
change date etc). Yet this information is associated with every event
and is available, whenever needed. For example event handlers
subscribe to the changes ("void Handle(Change<SomeEvent> myEvent)"),
where change includes actual strongly-typed event and all the
versioning and sender information associated with it.

Best regards,
Rinat

Reply all
Reply to author
Forward
0 new messages