Global ordering in an EventStore

1,556 views
Skip to first unread message

Sebastian Stehle

unread,
Jul 13, 2017, 2:13:24 AM7/13/17
to DDD/CQRS
Hi,

I am writing a CQRS application with Event Sourcing. Unfortunately I have to use MongoDB (Replica Set) or Azure CosmosDB / DocumentDB as a store for my events. I have some experience with GetEventStore and I really like it.

I realized some problems under high load. We use a sequence number generator to create a unique global sequence number. This could be either implemented as a local counter (single machine environment) or Redis or MongoDB (findAndModify). Lets say I write two Events:

Event10 gets SequenceNumber 10
Event11 gets SequenceNumber 11

But they are not written to MongoDb in this order. In the meantime an EventHandler asks for all events since SequenceNumber 5 and gets 6,7,8,9 and 11 (because 10 is not written to the database yet). He saves position 11 as the last event he has received and asks for all events since 11 the next time. So Event10 will never be handled.

I have only those ideas so far:

1. When we just check if the event numbers are increasing when we retrieve them. Of course there could be holes in the streams, when an event was not written successfully. But the situation I described can only happen for new events. If we have a timeout of 3 seconds for an event to be written, we could just wait that time, when we see a hole. I also need to filter the events be stream, e.g. all events for products or bookings (Something like ^(products\-)|(bookings\-). I should probably do the filtering in the client then, which is of course not optimal. In MongoDB is a special datatype called BsonTimestamp which is the timestamp + counter and produces unique values. It works pretty well (as long as the clocks are synchronized), but not together with CosmosDB.

2. Always ask for something like "all Events since X or within the last 10 seconds" and maintain a list with handled events in the client. Probably a bad idea for high load.

Of course I could just use an event bus in addition to the persistent storage but this leads to other problems, e.g. the problem to write to the bus and the persistent storage in a single transaction.

How do you handle this problem?

Sebastian 

João Bragança

unread,
Jul 13, 2017, 3:59:18 AM7/13/17
to ddd...@googlegroups.com
Why are you forced to use databases not suitable for event sourcing to do event sourcing? If you can convince your employers to use sql server, something you could take a look at is https://github.com/SqLStreamStore/SQLStreamStore

Full disclosure, I'm one of the contributors :)

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.



--

Ramin

unread,
Jul 13, 2017, 5:13:03 AM7/13/17
to DDD/CQRS
I think I might be doing something similar. Here's how I do it. I write the events to Sql Server, and once that is done I publish the stored events to the bus. No need for one transaction. On top of that I can handle events synchronously in the first step inside the transaction for denormalizations I want to not be eventually consistent.

To your first point. Once events are stored, I inform a "sequencer" of that fact. It then numbers the new events inside Sql Server. The events are then published in that order. So events with original id values 10 and 11 may get sequence 10 and 11, or 11 and 10 if the first transaction takes longer and is not done when the sequencer starts. But that is perfectly fine, because sequence 10 will always be published before sequence 11. In either case everything will be ordered by the sequencer and there will be no gaps.

Disclaimer, this is probably not the best solution for large systems with high load, but it works great for me. Also I am not suggesting anyone do it this way, only answering your question of how others do it.

Cheers
Ramin

Sebastian Stehle

unread,
Jul 13, 2017, 5:51:21 AM7/13/17
to DDD/CQRS, joao...@braganca.name
I hope that the product will be installed on many machines. Therefore I try to keep the dependencies as low as possible. It is an option to make other implementations and then I will definitely have a look to your project. Looks interesting :)
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.

Sebastian Stehle

unread,
Jul 13, 2017, 5:56:33 AM7/13/17
to DDD/CQRS
So you have a single server as sequencer in your cluster? I could just write an event store server on top of MongoDB and ensure that I write the events ordered. Probably good enough and should be easy to achieve a throughput of 2000/sec or so. I could also stream the data with Grpc.

João Bragança

unread,
Jul 13, 2017, 6:03:32 AM7/13/17
to ddd...@googlegroups.com
The problem you're going to run into is that only a single document write in mongodb is atomic. Thus the only way to guarantee consistency (a requirement for any eventstore) is to keep all events for a stream in a single document.

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.

Sebastian Stehle

unread,
Jul 13, 2017, 6:36:31 AM7/13/17
to DDD/CQRS, joao...@braganca.name
 I create one document per commit with a secondary unique index for an CommitOffset. I think it should be eventual consistent.


Am Donnerstag, 13. Juli 2017 12:03:32 UTC+2 schrieb João Bragança:
The problem you're going to run into is that only a single document write in mongodb is atomic. Thus the only way to guarantee consistency (a requirement for any eventstore) is to keep all events for a stream in a single document.
On Thu, Jul 13, 2017 at 11:56 AM, Sebastian Stehle <mail2...@gmail.com> wrote:
So you have a single server as sequencer in your cluster? I could just write an event store server on top of MongoDB and ensure that I write the events ordered. Probably good enough and should be easy to achieve a throughput of 2000/sec or so. I could also stream the data with Grpc.


Am Donnerstag, 13. Juli 2017 11:13:03 UTC+2 schrieb Ramin:
I think I might be doing something similar. Here's how I do it. I write the events to Sql Server, and once that is done I publish the stored events to the bus. No need for one transaction. On top of that I can handle events synchronously in the first step inside the transaction for denormalizations I want to not be eventually consistent.

To your first point. Once events are stored, I inform a "sequencer" of that fact. It then numbers the new events inside Sql Server. The events are then published in that order. So events with original id values 10 and 11 may get sequence 10 and 11, or 11 and 10 if the first transaction takes longer and is not done when the sequencer starts. But that is perfectly fine, because sequence 10 will always be published before sequence 11. In  either case everything will be ordered by the sequencer and there will be no gaps.

Disclaimer, this is probably not the best solution for large systems with high load, but it works great for me. Also I am not suggesting anyone do it this way, only answering your question of how others do it.

Cheers
Ramin

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.

Ramin

unread,
Jul 13, 2017, 8:26:33 AM7/13/17
to DDD/CQRS, joao...@braganca.name
Wouldn't that be ok for one document = one stream per aggregate?

Greg Young

unread,
Jul 13, 2017, 8:33:19 AM7/13/17
to ddd...@googlegroups.com, joao...@braganca.name
Until you have 5m events in a stream

To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Sebastian Stehle

unread,
Jul 13, 2017, 8:38:53 AM7/13/17
to DDD/CQRS, joao...@braganca.name
I don't have a problem to keep the order in a single stream. It are  the "GetAllEvents" or "GetAllEventsFiltered("^products-")" queries that I don't know how to handle ;)

João Bragança

unread,
Jul 13, 2017, 10:03:37 AM7/13/17
to ddd...@googlegroups.com
There's the trade off you need to make unfortunately.

1) One document per stream. Falls over as soon as the stream gets large
2) One document per commit. Now you need to query an eventually consistent index to get the data back into your aggregate.

To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.

Sebastian Stehle

unread,
Jul 13, 2017, 10:30:34 AM7/13/17
to DDD/CQRS, joao...@braganca.name
With one document per stream I would have a problem with parallel changes. I have a problem with "at least once" delivery for my subscriptions to be more precise. I don't need global ordering, I just want to be ensure that no event is skipped and unhandled.

Nils Kilden-Pedersen

unread,
Jul 13, 2017, 10:42:44 AM7/13/17
to ddd...@googlegroups.com
Don't ever use unique global sequence numbers. It doesn't scale, and as you've found out, isn't reliable.

What it sounds like you need is monotonic sequence numbering per stream, which is much different. 
If you also need some sort estimation of global order (that's as good it'll get), use timestamps or a Lamport clock for causal consistency.


--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

Sebastian Stehle

unread,
Jul 13, 2017, 12:49:24 PM7/13/17
to DDD/CQRS
How would you solve my problem? I guess you could create materialized projection for each query and use lamport clock to calculate a position for each event within the stream. Either by duplicating the event or by a link. I guess this is what EventStore does.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.

Nils Kilden-Pedersen

unread,
Jul 13, 2017, 12:54:08 PM7/13/17
to ddd...@googlegroups.com
On Thu, Jul 13, 2017 at 11:49 AM, Sebastian Stehle <mail2...@gmail.com> wrote:
How would you solve my problem?

Can you rephrase it more succinctly? Specifically why you think you need monotonic global ordering?  

Sebastian Stehle

unread,
Jul 13, 2017, 1:10:06 PM7/13/17
to DDD/CQRS
I tried to do it another post.

I am looking for a solution so that each event is handled by each event handler at least once. Each handler saves the position of the last received event and polls for all events after this position. There is no need that this position is a global sequence number.

My current solution for MongoDB/CosmosDB associates a global unique number for each event (could also be the ticks). But even if Event2 gets a higher number than Event1 it does not mean that Event1 is available for the query before Event2. If you don't serialize the insertion to the MongoDB you get this problem.

Another idea is to insert the events without a global sequence number and assign a global sequence number in a background process.I guess it should be possible to number thousands of events per second with bulk updates, which is good enough for me.

Nils Kilden-Pedersen

unread,
Jul 13, 2017, 1:31:04 PM7/13/17
to ddd...@googlegroups.com
On Thu, Jul 13, 2017 at 12:10 PM, Sebastian Stehle <mail2...@gmail.com> wrote:
I tried to do it another post.

I am looking for a solution so that each event is handled by each event handler at least once.

In my experience, the best you can do, while remaining scalable, is to handle this probabilistically. 

So, if using timestamp, have some time skew window that you reprocess, based on where you think you are. Whether that is one second, one minute, one day, or one week, depends on your volume and tolerance for losing an event. 
 
Each handler saves the position of the last received event and polls for all events after this position. There is no need that this position is a global sequence number.

My current solution for MongoDB/CosmosDB associates a global unique number for each event (could also be the ticks). But even if Event2 gets a higher number than Event1 it does not mean that Event1 is available for the query before Event2. If you don't serialize the insertion to the MongoDB you get this problem.

Another idea is to insert the events without a global sequence number and assign a global sequence number in a background process.I guess it should be possible to number thousands of events per second with bulk updates, which is good enough for me.

Am Donnerstag, 13. Juli 2017 18:54:08 UTC+2 schrieb Nils Kilden-Pedersen:
On Thu, Jul 13, 2017 at 11:49 AM, Sebastian Stehle <mail2...@gmail.com> wrote:
How would you solve my problem?

Can you rephrase it more succinctly? Specifically why you think you need monotonic global ordering?  

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

tom.w...@gmail.com

unread,
Jul 13, 2017, 7:55:31 PM7/13/17
to DDD/CQRS
MongoDB offers some features to help here (make sure you use 3.4+ to benefit from some of the huge reliability improvements and new features, such as Raft-style consensus for replica sets, etc.). I think using them you can implement a fast and reliable event store:

* Event collection documents have fields (at least) aggregateId, sequence, timestamp, events
* You have a unique index on [aggregateId, sequence] - this ensures consistency
* Event documents actually contain an event batch (1+ events) in the events field - this allows you to atomically insert multiple events for an aggregate (a very common requirement)
* The document should include a field named 'timestamp' or similar, which you set to the empty Timestamp. This causes MongoDB to populate it automatically at commit time (https://docs.mongodb.com/manual/reference/bson-types/#timestamps). You are guaranteed that this will be monotonically increasing in an order matching the oplog (so the real order that writes happened in and become visible in), even with primary failovers, etc. I would suggest doing some high concurrency testing to confirm this, but I'm pretty sure it's the case, since it's how the oplog itself works (so the time stamp handling has to be perfect, or replica sets wouldn't work reliably, and they do now: https://jepsen.io/analyses/mongodb-3-4-0-rc3). The timestamp should also be useful as a global sequence number, across all streams (and dbs) that share a mongo replica set.
* You should always use READ_COMMITTED (3.4+?) and WRITE_MAJORITY to ensure you only see real events (never rolled back if primary goes down), and you know a write has succeeded and won't be rolled back (from single member failures, that is)
* Readers can use the timestamp field if they want to read events from a certain point and keep track of that point, or can use aggregateId sorted by sequence for reading a specific aggregate
* You can use oplog tailing (only looking at inserts to event collections) rather than polling to unblock waiting readers => low latency
* If you reach huge scale, and need to shard individual collections, it will be trickier, since timestamps are only monotonic and unique per shard. Readers will need to track the shard key->timestamp map to avoid missing events. Alternatively, you could have a single writer whose job it is to update documents with a global sequence number, as it reads inserts off the oplogs of all the shards at once.

If you ever migrate to another event store, no problem - you can replay the events in order into the new store, using what ever approach it has to global sequencing. You can read across all the event streams in parallel (in timestamp order) if you want a global ordering.

Kind regards,
Tom

Sebastian Stehle

unread,
Jul 14, 2017, 2:52:48 AM7/14/17
to DDD/CQRS
Hi Tom,

in my first implementation I did almost this (just have to check READ_COMMITTED). I used a Notifier (Rx, or Redis Pub Sub) to inform the event handlers about new events to keep the latency low. Worked very well, but not with CosmosDB, because there is no timestamp.

I tried another solution, where I insert the events with a client calculated timestamp. I created a sequencer (as background job or separate process) to calculate the global sequence numbers in a separate thread. Works well as well. I will either keep this implementation or maintain a separate implementation for CosmosDB. In a cloud environment or kubernetes it should be very stable because you can just restart the sequencer within seconds. The sequencer could also ensure consistency for other implementations if your database does not support secondary indices.

Ramin

unread,
Jul 14, 2017, 5:19:41 AM7/14/17
to DDD/CQRS
> The sequencer could also ensure consistency for other implementations if your database does not support secondary indices.

How so? Consistency can only be ensured eventually, because the events already happened. And that would not be the job of a process that simply puts events in order, no?

Sebastian Stehle

unread,
Jul 14, 2017, 6:27:14 AM7/14/17
to DDD/CQRS
He could find conflicts, not ensure consistency. You are right. But probably not a good idea. I would just use EventStore if I could ;)

João Bragança

unread,
Jul 14, 2017, 6:40:20 AM7/14/17
to ddd...@googlegroups.com
Don't ever use unique global sequence numbers. It doesn't scale, and as you've found out, isn't reliable.

Both GetEventStore and SqlStreamStore use global sequence ids to guarantee ordering within a single partition. In fact, NEventStore did not do this for a long time, which caused major headaches.

Ramin

unread,
Jul 14, 2017, 9:52:17 AM7/14/17
to DDD/CQRS
I don't want to be pedantic, but to get the terms right, a subscriber/event handler would identify conflicts - and possibly correct them by issuing correcting commands. The sequencer and EventStore have entirely different responsibilities.
Reply all
Reply to author
Forward
0 new messages