How do you deduplicate events on the read side?

882 views
Skip to first unread message

Jack Snow

unread,
Nov 17, 2015, 5:42:26 AM11/17/15
to DDD/CQRS
I am interested in what you guys are doing to deduplicate events on the read side. All events will have a GUID. Unfortunately, not all events can be idempotent.

I am planning to use Kafka as my message bus. I can read an event on kafka and process it to update the read side, then update the read offset in kafka. However, if the consumer crashes after updating the read side and before updating the offset in kafka, the event will be read again when we bring the process back up.

I am considering checking for the guid of the event in the database on the read side before processing, if it doesn't exist, update the read side and add the guid of the event to a table in the database. This can be in MySQL for example, and all of that would happen in a transaction.

Is it valid/acceptable to have a log of processed events in each bounded context? Is this something that is important in production systems from your experiences?

Are there any better solutions than the above?

Kris Leech

unread,
Nov 17, 2015, 6:12:08 AM11/17/15
to DDD/CQRS
On 17 Nov 2015, at 10:42, Jack Snow wrote:

> I am interested in what you guys are doing to deduplicate events on
> the
> read side. All events will have a GUID. Unfortunately, not all events
> can
> be idempotent.

In what circumstances might you get a duplicate event?

>
> I am planning to use Kafka as my message bus. I can read an event on
> kafka
> and process it to update the read side, then update the read offset in
> kafka. However, if the consumer crashes after updating the read side
> and
> before updating the offset in kafka, the event will be read again when
> we
> bring the process back up.

I am also looking at using Kafka as an Event Store and was pondering the
same problem.

Having a uuid per event and having the consumer keep track seems like a
good option to me.

As you suggest the uuid would need to be stored in the same place as the
read model so it can be wrapped in a transaction.

I think the same is usually true of the offset too.

>
> I am considering checking for the guid of the event in the database on
> the
> read side before processing, if it doesn't exist, update the read side
> and
> add the guid of the event to a table in the database. This can be in
> MySQL
> for example, and all of that would happen in a transaction.
>
> Is it valid/acceptable to have a log of processed events in each
> bounded
> context? Is this something that is important in production systems
> from
> your experiences?
>
> Are there any better solutions than the above?
>
> --
> You received this message because you are subscribed to the Google
> Groups "DDD/CQRS" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to dddcqrs+u...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.


-—
Software Engineer

teamcoding.com
github.com/krisleech
linkedin.com/in/krisleech

Thomas Presthus

unread,
Nov 18, 2015, 4:50:34 AM11/18/15
to DDD/CQRS
Hi,

Could you provide a little bit of insight on why some of your events can't be idempotent?

Thomas.

Greg Young

unread,
Nov 18, 2015, 5:33:42 AM11/18/15
to ddd...@googlegroups.com
I can tell you how event store handles this.

On any given subscription there is an incrementing sequence (either
logical position or a stream based position). If you write to your
database and you store your checkpoint atomically you will gain
de-duping for free (e.g. simulates transactional messaging).

Cheers,

Greg
> --
> You received this message because you are subscribed to the Google Groups
> "DDD/CQRS" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to dddcqrs+u...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Hendry Luk

unread,
Nov 18, 2015, 5:30:17 PM11/18/15
to ddd...@googlegroups.com
Considering that kafka's offset is managed by the client, the chance of a crash with outdated offset sounds pretty slim.
But anyway, if you're to keep track of your event consumption yourself in mysql, why do you need the individual events? Don't you only need to keep track of the offset? And also, instead of checking it on each individual event you're consuming, can you not check it only when your server comes back up (from a crash) to reconcile your recorded offset against kafka?

--

Kirill Chilingarashvili

unread,
Nov 20, 2015, 3:56:27 AM11/20/15
to DDD/CQRS
I did not implement deduplication on read side yet,
but I am planning to do it the following way.
In my implementation - event store (sql server infrastructure) marks all event with global sequence number once it stores them.
I implemented projections as "infrastructure-independent" abstract interface
So all projections inherit form base projection class.
And whatever properties are in base - are stored for all projections.
So on base class - I will add property named "ProcessedSequence" - that will be the checkpoint for this projection.
The projection processing is done using three steps:
1) fetch projection (if not cached)
2) modify projection based on event
3) save projection

now between 1 and 2 - I will add a simple check - if incoming event's sequence equals or is lower that "ProcessedSequence" on projection - this event was already processed and I can silently ignore it

Simeon Dimov

unread,
Nov 24, 2015, 7:23:34 AM11/24/15
to DDD/CQRS
All the projections can and should be idempotent. A projection should only get the data from the event and store it as is for the sake of the view model(probably it can do data transformation like timestamp.ToString()) .If your projection can't be idempotent then it's very likely that its not a projection. because it's doing manipulation over data based on the current system state. Meaning you have probably a longer process like saga and that you need another aggregate or even another bounded context. A good sign for a smelly proejction is an "if" statement over data that is not part of the event. Usually we thend to miss those things becouse they are hidden in supporting domains/different BC that no one speaks of.

@yreynhout

unread,
Nov 26, 2015, 6:28:14 PM11/26/15
to DDD/CQRS
Sounds good to me. Could also use the read offset of Kafka as the "last seen" checkpoint (e.g. what Greg suggests with EventStore).

Hendry Luk

unread,
Nov 26, 2015, 7:21:26 PM11/26/15
to ddd...@googlegroups.com
Well, in the end "something" needs to do that computation. If you do it in a saga, an aggregator, a BC, whatever it is called, it will still have the same duplication problem.

This is not really a problem specific to CQRS in any way, but rather a common technical problem in Kafka (or any messaging/streaming system) in general. Like most messaging/streaming transports, Kafka guarantees at-least-once delivery, where duplicate deliveries do happen. Unsurprisingly, out-of-the-box solutions are as common as the problem itself.

Most event processing frameworks do take care of deduplication for you. In the case of Kafka specifically, there's Apache Flink (https://flink.apache.org/), that specifically adds a built-in guarantee of exactly-once delivery onto Kafka.

And if you know me in this list, you will know my aversion to writing (i.e. by hands) a full-blown "CQRS style application" in a serious scale, when you could pick up one of so many purpose-built data-processing frameworks to deal with all your data projections, transformations, and aggregations, where you avoid re-solving exactly these sort of low level complexities of event processing such as this one.


On Tue, Nov 24, 2015 at 11:23 PM, Simeon Dimov <sd.l...@gmail.com> wrote:
All the projections can and should be idempotent. A projection should only get the data from the event and store it as is for the sake of the view model(probably it can do data transformation like timestamp.ToString()) .If your projection can't be idempotent then it's very likely that its not a projection. because it's doing manipulation over data based on the current system state. Meaning you have probably a longer process like saga and that you need another aggregate or even another bounded context. A good sign for a smelly proejction is an "if" statement over data that is not part of the event. Usually we thend to miss those things becouse they are hidden in supporting domains/different BC that no one speaks of.

--
Reply all
Reply to author
Forward
0 new messages