Replaying the event store

2,517 views
Skip to first unread message

Jimmy Bogard

unread,
May 9, 2011, 8:47:54 AM5/9/11
to ddd...@googlegroups.com
Hello all,

One of the stated benefits of an event store with event sourcing is the idea that I can replay events to get back the state of a single aggregate. When we tried to apply this to the entire event store across all aggregates, this approach pretty much fell down. For an application with a few million events, replaying the entire event store takes about 10 hours, and is only going to grow. The reason we want to do this is to back-fill reporting data for historical purposes.

We've looked at a few options here. First, scaling out the processing side to have multiple writers. This would work, but in an internal IT organization, no one really wants to procure machines, virtual or otherwise, for a day or so just to back-fill some data. Cloud's not an option here, either.

We also looked at snapshotting, but right now our aggregates only contain the state to accept/reject the command messages coming in. That makes up only about 10% of the state contained on the actual view models.

The bottleneck is in the denormalizers, which use MSMQ (non-transactionally) and a service bus. To get around this a couple of times we've just used SQL to update the view model DB. It went much, much faster of course, a few seconds, but this subverted the entire process which incurs a bit of danger in that the view model could be desynchronized permanently from the aggregate.

Anyone else tackle this sort of problem, or is there a deeper flaw we're missing?

Thanks,

Jimmy

Rinat Abdullin

unread,
May 9, 2011, 9:33:05 AM5/9/11
to ddd...@googlegroups.com
Hi Jimmy,

We haven't hit millions of events in our system at the moment (largest store is reaching 200k events), however replaying them takes less than a minute.

Denormalizers are coded against a simple view writer interface tailored for atomic updates (similar to concurrent dictionary in spirit). In production they are working against the blob storage, while in dev (or view rebuild system) I use memory/files. Single-threaded rebuild against file system for 7-8 denormalizers (same as the ones used in production) with 3 inline event upgraders takes under a minute. Afterwards, I can just atomically overwrite the current state of view models in the cloud (using multiple upload threads). All this (including event sync) is nearly frictionless and takes one click. View writer design and a few dev guidelines force all denormalizer implementations to be portable and atomic.


Obviously this is much easier to do with NoSQL storage (that's the reason I've completely got rid of SQL view models), but you can use something similar with SQL as well. 

All the best,
Rinat

Rinat Abdullin | Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Greg Young

unread,
May 9, 2011, 9:38:20 AM5/9/11
to ddd...@googlegroups.com
Generally when rebuilding a projection you only rebuild the one
projection. Does you projection use *all* of the events? Generally I
only replay the events a given projection actually listens to (I don't
replay all projections ... only the ones that would change).

RabbitMQ as an example can easily do 10,000/sec. So 2,000,000 with
some quick math would end up taking about 2.5 minutes. It could be
that your handlers are a bit slow (which happens at times). I
generally look at handlers and look for opportunities for batching
(automatically) or in the case of some that are really just slow I
consider it not such a big deal because the rebuilding of projections
is asynchronous anyways. On one system we had about 200,000,000 events
in the store and a replay would often end up needing 20-50m events.
For these they often took over night but since its an async process it
was never a big deal (the system did not have to go down etc in order
to do it).

HTH

Greg

--
Les erreurs de grammaire et de syntaxe ont été incluses pour m'assurer
de votre attention

Jimmy Bogard

unread,
May 9, 2011, 10:43:22 AM5/9/11
to ddd...@googlegroups.com
Whenever I read stats on queuing technologies, it's something that leads me down the path of wondering whether it's a design issue in the denormalizers themselves.

So no, it wouldn't use all the events, that would significantly reduce the processing time. Right now all of the denormalizer events are hitting one queue, so it would be a bit difficult to selectively process particular events.

Do you strategically design your denormalizers for this type of situation, to be able to selectively process specific projections? i.e., a denormalizer per projection or something similar?

Greg Young

unread,
May 9, 2011, 10:47:32 AM5/9/11
to ddd...@googlegroups.com
Jimmy grab me offlist and I can explain further. I normally do not use
the same pipeline for replaying as I do for processing the live
stream.

Jonathan Oliver

unread,
May 9, 2011, 11:29:11 AM5/9/11
to DDD/CQRS
+1 for what Greg said. Don't use the same pipeline. For example, our
regular/production pipeline has the idea of a transaction around a
single message received from a queue endpoint. When building a
particular view from stored events, we have everything replay in
memory and then have a single transaction around it which makes
everything significantly faster because it's no longer writing to disk
after each message.

On May 9, 8:47 am, Greg Young <gregoryyou...@gmail.com> wrote:
> Jimmy grab me offlist and I can explain further. I normally do not use
> the same pipeline for replaying as I do for processing the live
> stream.
>
>
>
>
>
>
>
>
>
> On Mon, May 9, 2011 at 10:43 AM, Jimmy Bogard <jimmy.bog...@gmail.com> wrote:
> > Whenever I read stats on queuing technologies, it's something that leads me
> > down the path of wondering whether it's a design issue in the denormalizers
> > themselves.
> > So no, it wouldn't use all the events, that would significantly reduce the
> > processing time. Right now all of the denormalizer events are hitting one
> > queue, so it would be a bit difficult to selectively process particular
> > events.
> > Do you strategically design your denormalizers for this type of situation,
> > to be able to selectively process specific projections? i.e., a denormalizer
> > per projection or something similar?
>
> > On Mon, May 9, 2011 at 8:38 AM, Greg Young <gregoryyou...@gmail.com> wrote:
>
> >> Generally when rebuilding a projection you only rebuild the one
> >> projection. Does you projection use *all* of the events? Generally I
> >> only replay the events a given projection actually listens to (I don't
> >> replay all projections ... only the ones that would change).
>
> >> RabbitMQ as an example can easily do 10,000/sec. So 2,000,000 with
> >> some quick math would end up taking about 2.5 minutes. It could be
> >> that your handlers are a bit slow (which happens at times). I
> >> generally look at handlers and look for opportunities for batching
> >> (automatically) or in the case of some that are really just slow I
> >> consider it not such a big deal because the rebuilding of projections
> >> is asynchronous anyways. On one system we had about 200,000,000 events
> >> in the store and a replay would often end up needing 20-50m events.
> >> For these they often took over night but since its an async process it
> >> was never a big deal (the system did not have to go down etc in order
> >> to do it).
>
> >> HTH
>
> >> Greg
>
> >> On Mon, May 9, 2011 at 8:47 AM, Jimmy Bogard <jimmy.bog...@gmail.com>

Žilvinas Šaltys

unread,
May 9, 2011, 11:54:16 AM5/9/11
to ddd...@googlegroups.com
So for example if let's say you have to rebuild 1 view from let's say 1 million inserts and 10 million updates your event handler
would build an in memory representation of that and once all events have been replayed it would write to disk?

Jonathan Oliver

unread,
May 9, 2011, 4:31:50 PM5/9/11
to DDD/CQRS
Exactly. Only commit the transaction (if you're using RDBMS for your
views) once all events have been replayed.

On May 9, 9:54 am, Žilvinas Šaltys <zilvinas.sal...@gmail.com> wrote:
> So for example if let's say you have to rebuild 1 view from let's say 1
> million inserts and 10 million updates your event handler
> would build an in memory representation of that and once all events have
> been replayed it would write to disk?
>

ashic

unread,
May 10, 2011, 4:14:42 AM5/10/11
to DDD/CQRS
Hi Jimmy,
Here's our scenario:

WE're having to import data from the most horrible pseudo-normalized,
organically grown piece of shite database and as we are growing
concepts aggregate by aggregate (and "discovering" business rules all
the time), the import was run ever so often (a few times a week during
dev). It wan't in the range of millions of event but there were
several thousand. For reasons I'd prefer not to say, the read model
had to be SQL Server. And due to "not having to write SQL" (coz that's
scary perhaps) an ORM was chosen. We used EF 4. Some sort of
consistency was required : though eventual, all read models were to be
updated together or fail together. Initially, we were committing on
every denormalizer on every event and wrapping them all in a
transaction. This made the importer run for about an hour in debug
mode (reading from the old db was a cause of significant performance
loss). We then changed the bus to do one commit for each event. So,
instead of an event causing 70 commits in a transaction, we just did
one commit. This reduced running time to a few minutes. This could
obviously be optimized further, but we didn't want to deal with EF
issues when an event comes in that depends on the results of a
previous event who's changes haven't been committed yet. [And yes,
we're not catering for events coming in in the wrong order as our
domain is fine with having a single sequential stream.]

Anyway, the thing I'm saying is that something as trivial as db.Save()
in a denormalizer can kill performance (depending on what db tech
you're using).

Ales Vojacek

unread,
May 10, 2011, 6:05:00 AM5/10/11
to ddd...@googlegroups.com
Hi all,
we are trying to develop new piece of our system (which is CRUD).
This extension of our CRUD sw is for some kind of Views into our system
using web pages.
Because we are developers of that CRUD sw we can easily hook into some
interested events (say something as account created, account state
changed, money recieved/send ....) .

I hope that we can use some kind of ServiceBus/Messaging to populate
these Views (SQL database).
Our need is that we will find solution where messages are not lost
before they are applied to views and it will not significantly change
performance of our core application.

Are there some points which we have to take care about?
I mean can we just use ApacheMQ and create messages in CRUD and recieve
these messages in new application/service and populate views, or will
be better to develop extension in terms of CQRS which will consume our
Messages from CRUD.

In my point of view CQRS is complicated for this solution and does not
come with any added value in that solution, but more eyes can see more,
so it will be helpfull to read your comments.

Thank you for your help and time.

A>


Jimmy Bogard

unread,
May 10, 2011, 8:32:53 AM5/10/11
to ddd...@googlegroups.com
That's a really elegant solution, I like that. We're also doing wonky things like allowing external events to get pushed out, then clearing those queues once the event store has been replayed. Rather wonky.

@yreynhout

unread,
May 10, 2011, 8:54:13 AM5/10/11
to DDD/CQRS
- I presume nobody else is querying the system at this time.
- When you say in memory, do you really mean "in memory"? If so, do
you introduce batching to make it fit the memory space available or
you never hit that limit?

On 9 mei, 17:29, Jonathan Oliver <jonathan.s.olive...@gmail.com>
wrote:

@yreynhout

unread,
May 10, 2011, 8:59:19 AM5/10/11
to DDD/CQRS
Have you analysed your eventstream and looked into folding/filtering
events? e.g If you do three rename operations, do you need all three
of them, or is the last one sufficient? Of course, this highly depends
on what your denormalizers have as logic inside of them.

Greg Young

unread,
May 10, 2011, 9:03:07 AM5/10/11
to ddd...@googlegroups.com

Many talk about this but it introduces huge amounts of complexity for what is often very slight perf gains. I might consider this in a strange edge case but never in a 'normal' case

Jonathan Oliver

unread,
May 10, 2011, 11:47:50 AM5/10/11
to DDD/CQRS
I suppose it would depend upon how many read model objects need
updating. But because it's completely async it can be done on another
machine entirely, e.g. a cloud-based server with lots of RAM. I
suppose you could implement batching in blocks of 10,000 or
something. Anything is faster than writing to disk after each
message.

Rinat Abdullin

unread,
May 10, 2011, 1:11:57 PM5/10/11
to ddd...@googlegroups.com
Re the last one. Actually, as an experiment a while ago I compared in-memory and file-based view repopulation from the event stream ("cloud views" model). Single thread. Performance was similar. Probably in my case, in-memory upgrades and non-optimized message dispatch was bigger bottleneck, than IO. 

So it all depends on the case.

Rinat

João Bragança

unread,
May 10, 2011, 2:14:40 PM5/10/11
to ddd...@googlegroups.com
In memory being analogous to the NHibernate ISession. Speaking of which, if you use nhibernate to handle the read side, you get batching for free. Every n events, start up a new ISession.

Aleš Vojáček

unread,
May 12, 2011, 6:24:35 AM5/12/11
to ddd...@googlegroups.com
Hi all,
I tried to search this group and Internet for solution of our problem.
I hope that the answer is something what is "denormalizer" named here.
I hope that this can be used in our scenario for populating views on
reporting website of our CRUD application.

I have some questions about that.

If I will use some messaging like ApacheMQ for sending events to
denormalizer is there some good practices, how to deal with situation
when ApacheMQ(or another message queue) is not accessible?
I want to hookup into our existing system, and send events to
denormalizer which is doable, but the main system performance or
functionality have to work without denormalizer or ApacheMQ is not
accessible (hi load, or some net problems), on the other way views has
to be consistent. When the ApacheMQ, denormalizer comes online it has
to reply all events which was not send into queuve.

I can imagine some scenarios, but It will be really nice to hear some
comments from peoples who had some experiences with something similar.

Solution what I can imagine is something like this.
1. Hook into CRUD on recieving/sending money
2. IF ApacheMQ is accessible -> send event which will contain all data
needed to change views in read model. Here is important to attach all
data to event, which are needed to change views without needs to query
our CRUD database, because of that events can be pulled in no time
order, or there can be holes in queue. Another thing is that CRUD DB
records could be in new state in time in which I will consume events
in denormalizer.
3. IF ApacheMQ is not accessible, use some storage to store message
localy, or use CRUD DB to store these events which can be send in
future. This is for keeping read model in consistent state.

But there are some doubts about 2. and 3. It seams that I can remove
2. . When I will create denormalizer which will consume records in
some table in CRUD DB which will contains messages for populating read
model, it can be easlily as bottleneck in system because of traffic on
that table. In the other way if I will use only 2. that CRUD system
will depend on ApacheMQ.

I know that this group is about DDD/CQRS, but it is not possible to
move all app to CQRS, but we want to move some peaces of that system
and hopefully in future we can leave CRUD behind us.

Sorry for my bad english and I hope that someone can understand what I mean.

A.

Jimit

unread,
May 16, 2011, 9:42:53 AM5/16/11
to ddd...@googlegroups.com
@Rinat: NoSql for event storage or for persistent view models? How exactly did you find it easier?

Rinat Abdullin

unread,
May 16, 2011, 5:34:24 PM5/16/11
to ddd...@googlegroups.com
@Jimit.

Both. NoSQL for the persistent view models simplifies maintenance and rapid upgrades a lot. No need to bother with schemas - just update view handlers and click "Rebuild Views". Works even better for tiny dev teams and cloud environments.

As for event storage - I don't have per AR ES, but rather Domain ES. Still, NoSQL is no-brainer in my scenarios (200000 events so far and counting)

Rinat

Aleš Vojáček

unread,
May 16, 2011, 6:17:16 PM5/16/11
to ddd...@googlegroups.com
Hi all,
because of no response, if I tried to ask something wrong or if my
English is not good enough, please please tell me. And I will create
code example, or try to explain more details.
If this question is out of this group, I will try to find answer somewhere else.
Thank you all.

Rinat Abdullin

unread,
May 16, 2011, 6:40:23 PM5/16/11
to ddd...@googlegroups.com
Ales,

If messaging system connectivity is a problem (which is rarely an issue), you can save events into the local SQL table (in the same SQL transaction).

Then have the secondary process monitor the table and dispatch new events to service bus once in a while (i.e once per second). If the bus fails - just back off and keep retrying, till it succeeds. Saving events to the SQL table will be the only slow down moment here.

Then you can have denormalizers (essentially event handlers responsible for updating persistent read models) listen to these events. If you need to rebuild read models (aka views), just grab all events from the table and replay them. 

These read models could as well be located in the other DB (or even multiple servers), significantly offloading stress from the write DB. 

That should work enough for you to get started. There are multiple ways to improve performance from here, if needed (i.e.: move read models to NoSQL DB, flat files or in-memory cache)

All the best,
/ Rinat

Rinat Abdullin | Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts


Aleš Vojáček

unread,
May 16, 2011, 7:50:27 PM5/16/11
to ddd...@googlegroups.com
Yup nice,
and rly thank you.
I have something like that in mind.
Saving to SQL table is great for me, because of messaging system is
not available (which is possible, because of no other piece of system
using it) it is possible to say that SQL database has to be
accessible, because if is not our app will not work.
One think wich I do not like on this solution is that it will depend
on time (here I can see some bottleneck) because of trying something
in time frame can be expensive due long period of time and growing
number of "events" which are not delivered.
May be some service which will pull messages from sql table and which
will work only if messaging is up is solution.

I really thank you for your post, because it means that i'm not totaly lost.

I will try some prototype of that "extension" to our system and
hopefully I will get some more space in our bussiness app to do more
DDD CQRS.

Thank you a lot :-)
A.

Rinat Abdullin

unread,
May 16, 2011, 8:05:43 PM5/16/11
to ddd...@googlegroups.com
Ales,

1. Just use some compact serializer and you should be fine even if there are hundreds of thousands of messages in the table. Besides, append-only tables do not cause a lot of stress on the DB. I suggest to benchmark first before making any assumptions on the performance.

2. Messaging is an essential requirement fur such kind of solution. If messaging server is allowed to be down most of the time, then something has to be changed in the project (either discard the entire event-centric approach or make messaging more reliable)

Rinat

Ales Vojacek

unread,
May 17, 2011, 4:57:11 AM5/17/11
to ddd...@googlegroups.com
Thank you again.
Dne 17.5.2011 2:05, Rinat Abdullin napsal(a):
Ales,

1. Just use some compact serializer and you should be fine even if there are hundreds of thousands of messages in the table. Besides, append-only tables do not cause a lot of stress on the DB. I suggest to benchmark first before making any assumptions on the performance.

I will do some benchmarking, this table (or another table on messaging subsytem) will be needed for reply some kind of messages when new view will be introduced. And I'm thinking about some reports about amount of processed/waiting/replying/..... messages. So there will be some needs about this kind of informations.

2. Messaging is an essential requirement fur such kind of solution. If messaging server is allowed to be down most of the time, then something has to be changed in the project (either discard the entire event-centric approach or make messaging more reliable)

The messaging server will not be down in most of time, but even if it can happen, the need is make our app available instead of problems on view side/messaging side.
This side will be firstly only for some nontrivial reports (wich are slow on our existing db structure - a lot of joins and non trivial wheres) and for some kind of daily/montly reports to external systems.

Thank you Rinat a lot, when I will have some kind of prototype may be that there will some more concrete problems.

bodrin

unread,
May 18, 2011, 2:53:09 AM5/18/11
to ddd...@googlegroups.com
Hi,

Just to add that ApacheMQ can be configured (or at least in the past :) to use almost any SQL datastore, so that you even can have both JDBC/JPA and JMS transactions mapped to a single native DB transaction. I have not tried it, but if you are interested check this one
http://www.javaworld.com/javaworld/jw-01-2009/jw-01-spring-transactions.html?page=3#strp
... "Shared Transaction Resource pattern"

cheers,
bodrin

Ales Vojacek

unread,
May 18, 2011, 4:10:54 AM5/18/11
to ddd...@googlegroups.com
Thank you I will look at that.
A.
Dne 18.5.2011 8:53, bodrin napsal(a):
Reply all
Reply to author
Forward
0 new messages