ES replay with message queues?

345 views
Skip to first unread message

Cj de Vos

unread,
Jan 20, 2017, 9:14:07 AM1/20/17
to DDD/CQRS
In the scenario of rebuilding a projection: is it common to send replay events over a message queue like RabbitMq? 

Greg Young

unread,
Jan 20, 2017, 9:16:41 AM1/20/17
to ddd...@googlegroups.com
It is a very bad idea.

On Fri, Jan 20, 2017 at 2:14 PM, Cj de Vos <cdevos....@gmail.com> wrote:
> In the scenario of rebuilding a projection: is it common to send replay
> events over a message queue like RabbitMq?
>
> --
> You received this message because you are subscribed to the Google Groups
> "DDD/CQRS" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to dddcqrs+u...@googlegroups.com.
> Visit this group at https://groups.google.com/group/dddcqrs.
> For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Cj de Vos

unread,
Jan 20, 2017, 9:20:01 AM1/20/17
to DDD/CQRS
Thought so. Thanks.

Greg Young

unread,
Jan 20, 2017, 9:24:10 AM1/20/17
to ddd...@googlegroups.com
I talk about why in more detail in my polyglot data talk. Basically
you will need a control channel and things start getting weird.

On Fri, Jan 20, 2017 at 2:20 PM, Cj de Vos <cdevos....@gmail.com> wrote:
> Thought so. Thanks.

Cj de Vos

unread,
Jan 20, 2017, 9:26:02 AM1/20/17
to DDD/CQRS

Greg Young

unread,
Jan 20, 2017, 9:30:55 AM1/20/17
to ddd...@googlegroups.com
yeah it should be in there towards the last 1/3 or so

On Fri, Jan 20, 2017 at 2:26 PM, Cj de Vos <cdevos....@gmail.com> wrote:
> This one? https://www.youtube.com/watch?v=hv2dKtPq0ME

Cj de Vos

unread,
Jan 22, 2017, 6:47:07 AM1/22/17
to DDD/CQRS
Thanks for this.

Despite having read articles, giving it lots of thoughts and playing around with some concepts the information provided in this video helped me understand it much better.

edal...@gmail.com

unread,
Jan 27, 2017, 3:51:10 AM1/27/17
to DDD/CQRS
Wow, I am really glad I came here tonight. I had understood this really, really wrong. Today we were discussing this at work and we came up with the RabbitMQ idea. Now I know it was a major mistake. This post just saved my life.

By the way, it is minute 25 in Greg's talk of Polyglot Data, if you want to skip the fundamental explanations and jump into the good stuff. 

Now let me get this straight, this pub/sub style of broadcasting events is not just for the replay, but for the original event propagation as well. Right?

In order words, when a new subscriber arrives, it tells me from which event it wants to start and then I start sending it events until it catches up with the current stream of events. The client goes down and then comes back up and it continues from the last one it got.

I am curious regarding what specific technologies are a good fit to implement this type of subscription mechanism? Can anyone please point me into the right direction?

Alexey Zimarev

unread,
Jan 27, 2017, 4:55:21 AM1/27/17
to DDD/CQRS
Well, Greg's EventStore does just that :) See catch-up subscriptions.

However, for event broadcast in event-driven architecture for cross-BC communication it is quite okay to combine both approaches. For example, we use catch-up subscription combined with the message bus (MassTransit over RabbitMQ). We only have one of those per MC but our framework allows us to have as many as we want.

edal...@gmail.com

unread,
Jan 27, 2017, 10:30:57 AM1/27/17
to DDD/CQRS
Just for reference to others finding this thread later on, I assume you mean this documentation.

So, please, let's discussing here the merits of pull vs push event propagation.

Do the following ideas make sense?

Push

  1. I can have a push event notification mechanism for the current stream of events (e.g. a message broker like RabbitMQ or Kafka). 
  2. Every client will have persistent queue in which I publish the current stream of events. 
  3. If a client goes down and comes back up later on, it simply catches up by consuming the queued messages.
The problem is that I cannot use this same channel to replay events in the event store. So, what if I have a new client starting from scratch or from some past state X? 

Pull

  1. When a client connects, I don't care if it's a new client or an old one, the client requests a subscription to the stream of events after X.
  2. I create a stream of all events from X up to the current tail of events and publish it into the clients subscription channel. (Greg EventStore example uses AtomPub for this matter).

In his talk, Greg mentions an alternative way to do it, which he does not endorses: I could create a non-persistent publishing queue upon client request, and so, when a client goes up, it sends a request saying, can you make a queue of events starting at X, and then I make the queue, publish all events after X and append the current tail of events after that.
However, Greg seems to imply this is a bad idea. I am not sure if I got exactly why that's a bad idea. From the top of my head I think it is because the client wouldn't be allowed to control when or how many events it wants to receive or for how long or from which point in time, in other words, there is no control over the subscription. Also, I haven't figure it out what should happen when I have published all events from X into the queue. By the time I finish publishing them, the current stream of events may have already added more events into the store, and so now I will also need to continue polling the event store looking for more events after X that I have not already sent this particular client, which implies I will need to keep track of this somehow in the publishing side.

With the pull mechanism I no longer care about this, because we put that burden into the client.

while (there are more events or subscription is not cancelled)
   pull next event
   handle event
end while

If I built this synchronously the pull next event would be a blocking operation that when we reach the end of the stream would wait there until we get the next event pushed into the event stream. 
If I build this asynchronously (i.e. non-blocking I/O), I would register a callback that would let me know that there is a new event in the stream ready to be handled.

Now, onto the specifics what I could use to implement such communication mechanisms, besides AtomPub, as suggested by Greg, are there any other alternatives? 

I haven't follow Greg's tutorial all way yet, so I don't know if AtomPub does some sort of long polling here, or if the Http connection closes once I finished publishing that particular stream of events and then the client is supposed to pull again from that point on, potentially getting an empty stream if no new events have arrived.
Would it be stupid to consider implementing this feature using things like HTTP Long Polling, Web Sockets or HTTP Streams?

Am I on the right track here or have I already got lost?











Greg Young

unread,
Jan 27, 2017, 10:36:28 AM1/27/17
to ddd...@googlegroups.com
kafka is a pull model ¯\_(ツ)_/¯

Kyle Cordes

unread,
Jan 27, 2017, 11:02:48 AM1/27/17
to ddd...@googlegroups.com
On January 27, 2017 at 9:30:59 AM, edal...@gmail.com (edal...@gmail.com) wrote:

Pull

  1. When a client connects, I don't care if it's a new client or an old one, the client requests a subscription to the stream of events after X.
  2. I create a stream of all events from X up to the current tail of events and publish it into the clients subscription channel. (Greg EventStore example uses AtomPub for this matter).



Having fought it several different ways, I strongly recommend doing the above.



I haven't follow Greg's tutorial all way yet, so I don't know if AtomPub does some sort of long polling here, or if the Http connection closes once I finished publishing that particular stream of events and then the client is supposed to pull again from that point on, potentially getting an empty stream if no new events have arrived.
Would it be stupid to consider implementing this feature using things like HTTP Long Polling, Web Sockets or HTTP Streams?


Long polling will work great. So will SSE or Websockets, all of those are very suitable if you want to stay close to the web technology stack. You’ll need some complexity in the client and server code, well-chosen libraries to implement these things effectively. Make sure to handle error cases carefully, those are especially difficult to covered in your test suite. You’re probably going to want auto reconnect, auto recovery from things that go wrong. A complex CQRS ES driven system may end up with lots of subscribers, and it’s best not to have to rely on an automated or human system to notice one is stuck and go unstuck it. Better to make them automatically recover from anything that goes wrong and keep going.

We ended up doing something a bit more brutal than long polling, SSE, or websockets. Probably unnecessarily clever, but we wanted the happy path to be extremely fast so that we could run through test cases very quickly. We “cheated", we took advantage of the fact that all of the relevant machines would be on the same network segment, which is achievable even in cases where they happen to be geographically distributed with a bit of network magic.

Each consumer performs a fetch poll loop roughly as you wrote. Each consumer tracks its own high watermark, so restarting the consumer from the beginning consists of updating one bit of data in the consumer to talk to start at an earlier point (like the beginning). The polling looks again every N seconds or minutes to see if more data is available. But the section of the polling that waits, can be woken up by an external signal, read on:

We used a UDP broadcast hint mechanism that would alert all running consumers that new data is available. A consumer that doesn’t care how closely it keeps up to date, just doesn’t even bother to listen for this - because all of the semantics are right based on just the polling. A consumer that does want to stay quickly up to date simply listens for this hint and such is the new data upon receiving it. The UDP broadcast hint only reduces the wait time to approximately almost 0, it does not change the semantics or ordering or high water mark mechanism.

Although this UDP broadcast may sound completely ridiculous in today’s era of much higher level abstractions (and I certainly agree in a sense it is), it also produces approximately the best possible results with the least code and least runtime cost. We used JGroups so that we would not need to dirty our hands with actual low-level network code. Reading and understanding the JGroups documentation took the most time -  it has highly numerous capabilities not needed for the simple case. The actual implementation is very short.

We actually have worked on separating out our implementation of the above from problem domain code to be able to publish it as open source. We just haven’t got there yet. I’m a little bit hesitant to publish yet another event store, I certainly she’d the words of the been set on this mailing list about the techniques and understanding around CQRS+ES being more important than an implementation library.

But if you happen to be working in Java and don’t think the UDP hint idea is crazy, contact me and I will see if we can find something we could send you.


--
Kyle Cordes

Greg Young

unread,
Jan 27, 2017, 11:10:13 AM1/27/17
to ddd...@googlegroups.com
"The UDP broadcast hint only reduces the wait time to approximately
almost 0, it does not change the semantics or ordering or high water
mark mechanism."

Guess you are using some kind of UDP that provides ordering
assurances? I have yet to see that one. You can build ordering
assurances on top of UDP (message sequence numbers as example) but
they are certainly not 0 cost.

Kyle Cordes

unread,
Jan 27, 2017, 11:12:39 AM1/27/17
to ddd...@googlegroups.com
On January 27, 2017 at 10:10:11 AM, Greg Young (gregor...@gmail.com) wrote:
"The UDP broadcast hint only reduces the wait time to approximately
almost 0, it does not change the semantics or ordering or high water
mark mechanism."

Guess you are using some kind of UDP that provides ordering
assurances? I have yet to see that one. You can build ordering
assurances on top of UDP (message sequence numbers as example) but
they are certainly not 0 cost.


Nope, nothing like that. The UDP arrival has no effect on the semantics. It has nothing to do with the ordering. All it does is wake up a sleeping subscriber so that the subscriber does not have to wait N seconds or minutes to poll again. Upon being woken, the subscriber simply polls exactly like it would otherwise. The subscriber doesn’t care about the order the UDP packets arrive, it does not use the order of the UDP messages to determine the order of processing events. It always simply processes the data in the normal order from the data feed.

It is brutal and simple.

--
Kyle Cordes

Greg Young

unread,
Jan 27, 2017, 11:18:21 AM1/27/17
to ddd...@googlegroups.com
ES already supports a similar push model. It does it over TCP though
instead of UDP as UDP is often not routeable. Especially in the cloud.

Kyle Cordes

unread,
Jan 27, 2017, 11:28:10 AM1/27/17
to ddd...@googlegroups.com
On January 27, 2017 at 10:18:19 AM, Greg Young (gregor...@gmail.com) wrote:
ES already supports a similar push model. It does it over TCP though
instead of UDP as UDP is often not routeable. Especially in the cloud.



Yes, this is why I tell people they should go look at and probably use ES, and buy some commercial support while they are at it  :-)

For the project I was working on a couple of years ago where I built the thing I described, we had decided to go a different direction for various reasons. I’m really happy with our implementation. You can think of ours as a library easily embedded into an existing application, a smaller thing than a standalone server product like ES. Even if we publish it, it is a small simple thing compared to the production proven ES at https://geteventstore.com/

It’s possible to work to configure our thing to work over TCP, by editing a JGroups configuration file and without editing any code. Understanding and editing that file is probably 10x more difficult than obtaining and configuring geteventstore though. JGroups is an amazing piece of technology, robust and excellent functionality hidden behind a configuration mechanism that only a networking theory and practice wonk could love.

--
Kyle Cordes

edal...@gmail.com

unread,
Jan 27, 2017, 12:29:07 PM1/27/17
to DDD/CQRS
Thanks for the detailed answer Kyle. This is super interesting. We have a problem in the company I work for that seems to be a good fit for CQRS/ES, but we are first trying to fully grasp the implication of implementing it and we want to catalog all the gotchas before we do anything. So at this point I'm just trying to understand how this is supposed to work and getting examples, like yours, on how to wake up subscribers is very interesting.

I'm trying to understand the need of the UDP solution that you used to wake up a waiting subscriber. 

I think it might be related to a couple of questions I had:

What can I do in the publishing side to decide when to publish more events into the subscription stream. Obviously when a client requets that it wants events from X, I first read the events from the event store up to X, but eventually I would like to continue publishing real time events in that stream. Is there a clever way to do the transition? How do I join the real time stream?
Also, I suppose I will need a way to buffer how many events to send in order not to oveload the client (somewhat like reactive streams backpressure mechanism, or pagination in the AtomPub example from Greg).

Above all if the client is making a temporal query and it does not want to reach the end of the stream of events, I assume I will also need a limit, which is not set is infinite.

I was thinking that I could probably do it by initially recording what was the last event sent to this subscriber, and then check if there are more events after that:

//upon subscription
set last-event to the subscriber's last known event
set page to the size of the subscriber's buffer/page size

while subscription is not cancelled
    get next page of events after last-event from the event-store 
    if events-found is an empty set
        wait during some time t 
    else 
       publish all events-found to the subscriber
       set last-event to the last published event
    end if
end while

And if the client is interested in a temporal query, then we are not interested in keeping the subscription once we reach the limit the client seeks. We can close the subscription once we reach there.

//upon subscription
set last-event to the subscriber's last known event
set page to the size of the subscriber's buffer/page size
set limit-event to the last event the subscriber is interested in

while subscription is not cancelled
     if last-event >= limit-event 
         unsubscribe
    end if
    get next page of events after last-event from the event-store 
    if events-found is an empty set
        wait during some time t 
    else 
       publish all events-found to the subscriber where event < limit
       set last-event to the last published event
    end if
end while

        
 Kyle, I am understanding that your wait/notify UDP mechanism is to force an anticipated wakeup of the waiting thread. Am I on right the track here?
        
Is this a valid solution, are there other valid ways, possible better ways, to do it?


Greg Young

unread,
Jan 27, 2017, 12:44:59 PM1/27/17
to ddd...@googlegroups.com
A common optimization is to also support pushed results that are live.
To do this maintain a buffer of size b (circular buffer) as you read
based on pages check if that event is in buffer. If so then switch to
live subscription. This logic is implemented in CatchUpSubscription in
ES.

Cheers,

Greg

edal...@gmail.com

unread,
Jan 27, 2017, 1:25:22 PM1/27/17
to DDD/CQRS
That's brilliant. Greg!

That's what I was after. This super interesting.

So, we have an interesting problem in the warehouse. When an order arrives, we are supposed to pick the items for that order and put them into the conveyor that takes them to a packing station. There are pickers in the warehouse that are located in strategy points that go and look for the items and place them in totes that are later conveyed to the packing stations.
There is an algorithm todays that runs in a cronjob and reads from the database to determine the state of world and rebuild the pick list, which is basically the stream of items a picker is supposed to pick. The pick list stream is constantly changing as new orders arrive and as pickers move over the warehouse. For example, you would like to optimize the distance to pick a given product and as you move between the aisles you would like to avoid that pickers have to go back all the way to the first aisle when they start a new pick stream.

Today our process is an embarrassment of engineering, on days of high load it is a source of database contention and takes a lot time to rebuild the picking list.

We were considering CQRS/ES as an alternative solution to our problem. Basically, the changes in the status of shipments are our source of events. As events arrive we are supposed to redefined the state of the world, i.e. recalculate the picking list. Also, as picker pick items from their stream, we may need to recalculate their world. We haven't done any numbers yet, but given the size of the pick list, I reckon that we could even keep it in memory, and using the polyglot pattern Greg describes in his talk, perhaps keep a persisted version just as a safety net.

And that's why all this replay use case is so important for us, because if we keep a memory image only, I would feel more confident if we can make sure to rebuild it without much hassle. As a matter of fact, we have to deal with the legacy database, and so initially we'd probably be forced to just do CQRS and eventually reach the nirvana of ES if we do this right.

To your experienced eyes, does this sound like something CQRS/ES would help to implement?

Greg Young

unread,
Jan 27, 2017, 1:28:29 PM1/27/17
to ddd...@googlegroups.com
A common pattern to add here is to have a projection to a legacy
system. You don't need to move everything all at once.

Kyle Cordes

unread,
Jan 27, 2017, 1:35:34 PM1/27/17
to ddd...@googlegroups.com
On January 27, 2017 at 11:29:09 AM, edal...@gmail.com (edal...@gmail.com) wrote:
I'm trying to understand the need of the UDP solution that you used to wake up a waiting subscriber. 


In the use we have made of it, it is in a sense not necessary at all. It was simply the most optimal combination of the lease code for the most rapid functionality to make data flow through the system asynchronously and immediately instead of more slowly with polling. You could actually block the UDP packets in the system could continue to function, but it would function more slowly because subscription consumers would awake according to their polling delay rather than immediately when new events are available. A much nicer failure mode than stopping completely by the way.


I think it might be related to a couple of questions I had:

What can I do in the publishing side to decide when to publish more events into the subscription stream. 

If you are doing DDD+CQRS+ES, the traditional answer is that at the end of an aggregate processing a command, it should emit a batch of zero..N events into the event system of record. However there are also many useful cases for CQRS+ES without much DDD. In which case you emit the events after whatever thing has happened has happened.


Obviously when a client requets that it wants events from X, I first read the events from the event store up to X, but eventually I would like to continue publishing real time events in that stream. Is there a clever way to do the transition? How do I join the real time stream?I

In the solution we implemented, and I think it is similar for others, there is no real explicit transition here. It is more or less completely seamless because a subscriber is simply running in a loop consuming more events. there is no separate real-time versus old stream. There is only a single log structured list of events.

By the way, someone else who worked with me wrote about a bunch of this in much detail. There are five blog posts in the link below, I think you might enjoy several of them very much.

https://blog.oasisdigital.com/category/cqrs/



Also, I suppose I will need a way to buffer how many events to send in order not to oveload the client (somewhat like reactive streams backpressure mechanism, or pagination in the AtomPub example from Greg).

Actually I think with both would I have done and what Greg is done, something like pagination does the job. Subscribers pull (and poll) data, it is not sent to them. So there is no buffer overload concern. Nor any back pressure. Subscribers just process the data as fast as they are able.




Above all if the client is making a temporal query and it does not want to reach the end of the stream of events, I assume I will also need a limit, which is not set is infinite.

Yes, right, since the subscriber is simply consuming chunks of data linearly from a very long list, it’s easy to tell it to stop at item N.

The loops you wrote look somewhat complicated. The actual implementation we have done has some complexity certainly. But it is a different kind of complexity, it is around getting the error handling and batching right and efficient. But in the implementation we have, there is no concept of transition between catching up and running current data. I think that is also the case if you write a subscriber using Greg’s ES.


 Kyle, I am understanding that your wait/notify UDP mechanism is to force an anticipated wakeup of the waiting thread. Am I on right the track here?

Yes, that is correct. That’s all it does. Subscribers that are asleep waiting for the next polling interval, get woken up so they go grab the newly published events immediately. If they happen to be listening. If they happen to be running. But you could certainly stop one and started later and it would catch up from wherever it happened to be. Only the subscriber keeps track of a high watermark.


        
Is this a valid solution, are there other valid ways, possible better ways, to do it?

I’m sure it is possible to make a system like this work, where you somehow manage a transition between old and current explicitly, and where you have a notion of having to push the information to the subscribers. We actually went down this path early on in our efforts. We found it surprisingly difficult to make it work that way. Eventually the “polling with hinted wake-up” proof much better for us at least.

If I had to give just one piece of advice to someone starting with event sourcing, it would be this: when you heard of all the stuff you probably were thinking of some sort of message oriented middleware publish subscribe system. Resist the urge. Set that down and lock it in the closet. Then go pick up an event store with an idiomatic way to implement pull/poll/etc subscriber/projections, like Greg’s.

Kyle Cordes

unread,
Jan 27, 2017, 1:37:20 PM1/27/17
to ddd...@googlegroups.com


On January 27, 2017 at 11:44:57 AM, Greg Young (gregor...@gmail.com) wrote:

A common optimization is to also support pushed results that are live.
To do this maintain a buffer of size b (circular buffer) as you read
based on pages check if that event is in buffer. If so then switch to
live subscription. This logic is implemented in CatchUpSubscription in
ES.

Oddly enough we also did this in our UDP implementation. It pre-populates the subscriber local cache with payload for the event. It does this in a way that still preserves the strict linear loop processing events in order, without regard to how the UDP messages arrive. I elided this from my previous description because it is simply a network bandwidth optimization, does not change the semantics. In retrospect, networks are so ridiculously fast that it might have been a waste of effort.



--
Kyle Cordes

Danil Suits

unread,
Jan 27, 2017, 6:16:59 PM1/27/17
to DDD/CQRS
Do you wake disinterested subscribers?  or are you using the information sent via UDP to specify which subscribers should wake?


Kyle Cordes

unread,
Jan 27, 2017, 6:24:18 PM1/27/17
to ddd...@googlegroups.com
On January 27, 2017 at 5:17:01 PM, Danil Suits (danil...@gmail.com) wrote:
Do you wake disinterested subscribers?  or are you using the information sent via UDP to specify which subscribers should wake?

Yes, something similar to this. Not quite as specific as "which subscribers to wake”, but the packets carry enough payload that a subscriber could decide whether to wake based on the contents. In the system we implemented, there is no central thing managing a list of current subscribers, there is just a peer to peer multicast channel in which each peer announces “I successfully published an event to the store” so that others can wake up if they want to. It would be possible to implement more elaborate filtering than we did of course, but in the sort of obvious and straightforward implementation of a CQRS projection, the projections (subscribers) typically want the whole event stream. Even if they only actually update themselves based on some of the events.




--
Kyle Cordes

Reply all
Reply to author
Forward
0 new messages