Newbie Question - Event Chaser and Batching

266 views
Skip to first unread message

TonyC

unread,
Mar 13, 2017, 7:02:48 AM3/13/17
to DDD/CQRS
Hi there, I am new to Event Sourcing & DDD, and have done allot of background research and watched Greg Young's videos on the subject, as well as his videos on the EventStore streaming database itself (Which is awesome BTW!) 

In the business case of an online retailer which is receiving orders I can get allot of events. For the sake of argument lets say I get an InventoryAdjusted event which has a GUID and an amount as shown here:

{
  "id""3EEEC9B0-D176-4F2C-A791-42C57191C1BE",
  "adjustment""-2"
}

If I am chasing the events to keep a SQL Server read model up to date, I envisage the sequence of events as:
  • Determine the version of the last event I processed
  • Load the stream of events greater than that version
  • For each event create the aggregate and load all events to get current state
    • Apply the event that I am currently processing
    • Update the read model by calling the stored procedure spAdjustInventory 
  • Process the next event
This all seems fine but seems incredibly wasteful for two reasons
  1. I have to load all events for the aggregate each time I process a single event (OK perhaps I snapshot every 50 events or so)
  2. If I encounter the same aggregate 5 events later, and then again 3 events after, I have to repeat step 1 above 3 times when I could have just done it once and applied 3 events
My question is this doesn't seem right to me, and the DBA will throw a hissy fit as I bash the SQL Server every 250 ms calling the same procedure. Ideally what I really want to do is the following
  • Get all new events since I last processed them
  • Group them into batches for each unique instance of an aggregate
  • Apply all outstanding events in one go
  • Call the stored procedure once with the adjusted inventory value
Is there a pattern for doing this specifically with EventStore? I was thinking perhaps a projection could create new streams per aggregate from the single events stream, and maybe I subscribe to them? What I don't want to do is keep a list of all aggregates and poll them for changes and update the database if they exist, as this woudl mean checking every aggregate as I won't know which ones have new events. If I have 1000's of products event with threading that is gonna be SLOW!. Maybe I am being dumb but my fuzzy little head is gonna explode soon thinking about it, and I'm sure you don't want that on your conscience. ;-)

Rickard Öberg

unread,
Mar 13, 2017, 9:47:09 AM3/13/17
to ddd...@googlegroups.com
Hey there Tony,

Yeah, the process you describe sounds a bit funky. Here's what we do.

On startup of read model chaser read last written event id from db.
Subscribe to EventStore from that point.
Get events up to a max batch size (4096 in my case).
Start transaction in db.
For each event in batch
Convert event to db statement (Neo4j in our case)
Apply statement with event data as parameters
Write new last written event id
Commit transaction
Rinse repeat

In your case it sounds like you may want to do some squashing of
events before applying them, but otherwise that's about it. There's no
"loading of aggregate" or somesuch, that ONLY happens on the command
handling side. This is a pure "get events into db" situation.

Makes sense?

regards, Rickard
> --
> You received this message because you are subscribed to the Google Groups
> "DDD/CQRS" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to dddcqrs+u...@googlegroups.com.
> Visit this group at https://groups.google.com/group/dddcqrs.
> For more options, visit https://groups.google.com/d/optout.

TonyC

unread,
Mar 13, 2017, 12:06:23 PM3/13/17
to DDD/CQRS
Hi Rickard thanks for the reply. You are right in that I do indeed want to squash some events, in fact you could say I want another view in between the EventStore and the SQL Server. Now I have thought about it some more I think REDIS would be an ideal choice here, as its in memory, and has atomic operations to mutate state. 

I can then subscribe to the event feed and perform mutations on the in-memory representation, and increment the EventsProcessed count and EventLastUpdated stamp on each aggregate as I churn through the events. In a separate thread I can watch these flags, and when they reaches a certain threshold, say 50 events, or 1 minute, or some combination strategy, then I can call the stored procedure to update the SQL server database. Now I have completely separated the dependency between the SQL Server view and the Event Store by placing a process in the middle to do my batching for me. I'm sure this is probably a Gang Of Four pattern of some description.

Any thoughts on this? I don't want to over-engineer for the sake of it BUT I need a way of throttling the usage against the SQL Server. :-)

andho

unread,
Mar 13, 2017, 11:43:48 PM3/13/17
to DDD/CQRS
Hi,

First thing I noticed is that you event is not idempotent. You should not create events with - or + values but the actual adjusted value for example your inventory count was 6 and you -2, so you event should say InventoryAdjusted with new inventoryCount 4.

When your events are idempotent, you just need update logic in your projections. In this case, it will be:

    SET inventoryCount = event.inventoryCount.

Instead of:

    SET inventoryCount = inventoryCount + event.adjustment

As long as you maintain order of events, this will end up with the correct final state of the model. This will also remove the burden of having to do the projections inside a db transaction, but that's up to you. Most importantly, you shouldn't have to load back the Write Model aggregate for building the Read Model.

Regards

Danil Suits

unread,
Mar 14, 2017, 9:00:56 AM3/14/17
to DDD/CQRS
First thing I noticed is that you event is not idempotent

They never are -- events are just data; it's the handling of the event that needs to be idempotent.

currentState.apply(event).apply(event) === currentState.apply(event)


You should not create events with - or + values but the actual adjusted value

You can do it that way, of course, but that sounds to me a lot more like storing snapshots than storing events.  Maybe I've been misunderstanding the literature.

Rickard Öberg

unread,
Mar 14, 2017, 9:08:14 AM3/14/17
to ddd...@googlegroups.com
I would agree with Danil, as long as your subscriber is idempotent doing delta events is fine.

Sent from my iPad
--

andho

unread,
Mar 14, 2017, 11:16:53 PM3/14/17
to DDD/CQRS
Events and Commands are messages, and they can be idempotent. See http://blog.jonathanoliver.com/idempotency-patterns/.

But yes, not all messages are idempotent.

So if the handling of the event is idempotent, that would solve the problem. And the easiest way to ensure idempotency in projections is to maintain the order of the events. This can be done using an aggregate version, which you can save into the Read Model, and you only apply those events that match the Aggregate version in the event and the Aggregate version in the Read Model. If the versions don't match, then delay the processing of the event.

You can also do de-duplication at the message infrastructure level, but I think it's easy to handle at the Handler for projections.

https://lostechies.com/jimmybogard/2013/06/03/un-reliability-in-messaging-idempotency-and-de-duplication/

Ben Kloosterman

unread,
Mar 15, 2017, 6:09:41 AM3/15/17
to ddd...@googlegroups.com

This all seems fine but seems incredibly wasteful for two reasons
  1. I have to load all events for the aggregate each time I process a single event (OK perhaps I snapshot every 50 events or so)
Use an identity map cache... cases which update a very large amount of aggregates are often (but not always) an indicator of poor CRUD like design.
 
  1. If I encounter the same aggregate 5 events later, and then again 3 events after, I have to repeat step 1 above 3 times when I could have just done it once and applied 3 events
Its not an issue if its a map.


Note I normally dont bring in the aggregate when building the read model i get the new events from a event store /  no sql data source. I make sure the events have everything that is needed or i read the read model for relations . In this case the command would be -2 but the event would be something like changed stock from 1002 to 1000 ( there are cases when this will  not be good enough) .  As long as you process in sequence this will be simpler.  
 
My question is this doesn't seem right to me, and the DBA will throw a hissy fit as I bash the SQL Server every 250 ms calling the same procedure. Ideally what I really want to do is the following
  • Get all new events since I last processed them
  • Group them into batches for each unique instance of an aggregate
  • Apply all outstanding events in one go
  • Call the stored procedure once with the adjusted inventory value
Is there a pattern for doing this specifically with EventStore? I was thinking perhaps a projection could create new streams per aggregate from the single events stream, and maybe I subscribe to them?

Yep.
 
What I don't want to do is keep a list of all aggregates and poll them for changes and update the database if they exist, as this woudl mean checking every aggregate as I won't know which ones have new events. If I have 1000's of products event with threading that is gonna be SLOW!. Maybe I am being dumb but my fuzzy little head is gonna explode soon thinking about it, and I'm sure you don't want that on your conscience. ;-)

Thats right and that would be bad.. Note in most cases your IO limited so threading is more of a hinderence ( out of order ) than a benefit.

Ben

TonyC

unread,
Apr 4, 2017, 11:35:11 AM4/4/17
to DDD/CQRS
Thanks for all your help :-)

João Bragança

unread,
Apr 4, 2017, 4:31:48 PM4/4/17
to ddd...@googlegroups.com
With this strategy, how do you deal with stream migration? If for whatever reason you need to move a stream, transform a stream, etc, that position becomes worthless.


> Visit this group at https://groups.google.com/group/dddcqrs.
> For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "DDD/CQRS" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+unsubscribe@googlegroups.com.

Visit this group at https://groups.google.com/group/dddcqrs.
For more options, visit https://groups.google.com/d/optout.

Rickard Oberg

unread,
Apr 4, 2017, 4:41:11 PM4/4/17
to ddd...@googlegroups.com
I do a blue/green deployment and rebuild the database from scratch. Did this today in our prod environment as it happens.

/Rickard
To unsubscribe from this group and stop receiving emails from it, send an email to dddcqrs+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages