Consistent aggregates

107 views
Skip to first unread message

Jochen Mader

unread,
May 18, 2018, 3:24:15 AM5/18/18
to debezium
First of all: Thanks for a truely impressive project.
I've been playing around with Depezium for a while, mainly following this blog-post about replicating aggregates (http://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/).
After playing around with some Data Warehouse usecases I'd like to figure out some more business critical usecases.
The problem is that I currently can't see how to get a fully consistent aggregate.
The article I mentioned above shows how to do an eventually consistent aggregate which isn't enough for our usecase.
We need to know when an aggregate which is spanning multiple tables in the source db is consistent on the replication side.
I don't see how I can identify the moment I have all parts of a transaction to produce a full aggregate.

What kind of approaches would you suggest to get there?

Jiri Pechanec

unread,
May 18, 2018, 4:05:45 AM5/18/18
to debezium
Hi,

I think there is a way how to produce a consistent strong aggregate. For MySQL and PostgreSQL the transaction id is a part of the offset related to the record. So you can use aggregation based on session windows where session id would be the transaction id. With reasonably set timeouts you should receive windowed events belonging to the same transaction.

J.

Jochen Mader

unread,
May 18, 2018, 4:24:53 AM5/18/18
to debezium
That's what I have benn playing aropund with.
Problem is that there a couple of situations where consumers can run into a consiberable drift:
- Network reasons
- Topic-Master change

In these situations timeouts break.
The problem is that from my understanding I can run in multiple situations:
Happy path: Update is missing one table and breaks due to referential integrity
Bad path: Update is missing one table but it doesn't cause a failed write resulting in corrupted data.

Is there a way to have a topic sending some sort of a transaction summarry, containing a list of updated tables?

Jiri Pechanec

unread,
May 18, 2018, 4:49:37 AM5/18/18
to debezium
There might be an ultimate soltuion but it would probably need a Processor API. You would create your own processor that would receive Debezium events. The processor will store events in key value store until transaction id changes. When the tx id changes it takes all events from the store and make a batch event out of it send it downstream and again store incoming events in the store.

J.

Jochen Mader

unread,
May 18, 2018, 5:04:15 AM5/18/18
to debe...@googlegroups.com
I checked the Postgres-implementation:


It would be possible to gather this information there.
Problerm is:
A LSN can contain multiple transactions and a transaction can span multiple LSNs.
I know that Postgres itself has the information og BEGIN_TX and END_TX.
This is (as far as I saw) currently not made available in the Java-code.
If we could get that information in there it would be possible to build a summary object and publish it on a separate topic.

I am not deep enough into Debezium right now, do you know if the BEGIN_TX/END_TX information is available somewhere in there?

Thanks,
Jochen

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/ab2e90ad-7144-4d82-a7d3-1c9efdd6aa0c%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Jochen Mader | Principal IT Consultant

codecentric AG | Elsenheimerstr. 55a | 80687 München | Deutschland
tel: +49 89 215486633 | fax: +49 89 215486699 | mobil: +49 152 51862390
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Jochen Mader

unread,
May 18, 2018, 5:54:38 AM5/18/18
to debe...@googlegroups.com
Ok, I just found the source of WAL-information in StreamingWal2JsonMessageDecoder.
looks like as if everything is there and I just learned quite a bit about its structure.
All information is there (at least for Postgres) to build up what I suggested :)

Gunnar Morling

unread,
May 23, 2018, 9:08:06 AM5/23/18
to debezium
Hey Jochen,

Welcome to the mailing list and thanks a lot for the nice words. Creating aggregated events is indeed one of the most interesting topics and it's up high on our agenda (next to adding more connectors) to facilitate the creation of such aggregates in a consistent fashion.

> Ok, I just found the source of WAL-information in StreamingWal2JsonMessageDecoder.
> looks like as if everything is there and I just learned quite a bit about its structure.
> All information is there (at least for Postgres) to build up what I suggested :)

From a high level, for the PG connector it should work to interpret a change of the TX id in the "source" block from one to another as a committed TX. I.e. in a "processor" you'd collect all events until you see one with a different TX id, at which point you could create whatever aggregate you need based on the events you got until then.

The problem though is that ordering isn't guaranteed across topics, so you wouldn't be fully sure that no further messages associated to the same TX arrive on other topics. I.e. this would be a safe approach only if all messages that belong to the tables of one aggregate are sent to a single topic. And then of course there's the case where new TX is spawned for quite some time (or no tables you're capturing are altered), in which case you'd wait for a long time.

I'm sympathetic towards emitting "TX messages" (TX started, TX committed) to a dedicated topic and it probably should be the first step we take towards making aggregations simpler, though the ordering problem described above still remains.

Regarding your last reply, was this to say that it's something you're going to implement in code of yours, or something you suggest should be implemented in Debezium?

In any case, thanks a lot for bringing up this important topic, I hope we can support such use cases (which are very common) better going forward.

Cheers,

--Gunnar
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.
--
Jochen Mader | Principal IT Consultant

codecentric AG | Elsenheimerstr. 55a | 80687 München | Deutschland
tel: +49 89 215486633 | fax: +49 89 215486699 | mobil: +49 152 51862390
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Jochen Mader

unread,
May 30, 2018, 5:29:42 AM5/30/18
to debe...@googlegroups.com
I am so sorry, completely forgot about this thread (swampe by work ...).

I think the TX-message containing the info about executed operations would already help a lot, especially since things lose their order.
If I know that
Transaction X contains:
- 2 UPDATES
- 1 INSERT
- 3 DELETES
I can collect those out of an unordered stream.
I can discover when I have reached a consistent state and put that in a different store.

We might even get away with an even more unspecific message:
Transaction X has 6 elements.

So instead of a TX-started TX-stopped message I'd like to have a TX-message containing these informations.

I'd love to contribute, maybe I get some time after the next Vert.x-Release to work on this. Can't promise though, family + work + open source are hard to coordinate ;)



To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages