Request for comments: DDD-3 incemental and and ad-hoc snapshotting

388 views
Skip to first unread message

jiri.p...@gmail.com

unread,
Apr 25, 2021, 11:55:31 PM4/25/21
to debezium
Hi,


In this document we propose implementation of two features

1) Ad-hoc snapshotting that would allow the user to trigger a snapshot of table(s) anytime during the streaming
2) Incremental snapshotting that would enable the user to take the snapshot of a table during the streaming (this is an off-spring of DBLog implementation)

Please take a look at the proposal, any comments are welcome and would be incoirporated either directly in the first cur or in a follow-up steps.

Thanks in advance!

Jiri Pechanec

Philip Ittmann

unread,
Apr 26, 2021, 6:00:53 AM4/26/21
to debe...@googlegroups.com
Hi Jiri,

Thank you for sharing this! I am really excited to see this functionality added to Debezium as large-table snapshotting is the only real pain point I've experienced running DBZ.

I've read through the design doc. For the open question of How to avoid a never-ending incremental snapshot if there are inserts happening continously? I am curious to know if this is an issue only when the rate of inserts is greater than the rate of snapshotting? And if it is, I wonder if there is a viable solution other than increasing the rate of the snapshotting behaviour?

For the second open question How to deal with different PK types? And composite PKs?, are there cases where a primary key/composite key wouldn't allow sorting deterministically?

Best wishes,
Philip

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/cc539d94-d4ad-467b-ac12-74f888827a99n%40googlegroups.com.
Message has been deleted

jiri.p...@gmail.com

unread,
Apr 27, 2021, 2:03:20 AM4/27/21
to debezium
Hi,

for the former I think the solution is pretty easy. At time of snapshot the largest PK is recorded and it will act as the limit. It is /should be safe operation as a new data written to the table with PK larger than that will be captured by the streaming code

For the latter - it is more an issue of how to store PK values that were the last queried and reuse them in the new query. I am more concerned about NULL values as part of composite PK.

J.

Gunnar Morling

unread,
Apr 27, 2021, 1:10:19 PM4/27/21
to debe...@googlegroups.com
Hi Philip,

Thanks for your feedback!

Am Mo., 26. Apr. 2021 um 12:00 Uhr schrieb Philip Ittmann <philip....@gmail.com>:
Hi Jiri,

Thank you for sharing this! I am really excited to see this functionality added to Debezium as large-table snapshotting is the only real pain point I've experienced running DBZ.

I've read through the design doc. For the open question of How to avoid a never-ending incremental snapshot if there are inserts happening continously? I am curious to know if this is an issue only when the rate of inserts is greater than the rate of snapshotting? And if it is, I wonder if there is a viable solution other than increasing the rate of the snapshotting behaviour?

Yes, this was my concern when adding that question and I agree, there probably isn't much that can be done here, other than adding means of parallelizing snapshots as a next step, so to increase the snapshotting rate.

@Jiri, in that light, monitoring snapshot progress (within a table) would be very useful to have. I suggest to add a section "Metrics Impact" to the DDD, listing this (and potentially others).

For the second open question How to deal with different PK types? And composite PKs?, are there cases where a primary key/composite key wouldn't allow sorting deterministically?

Apart from the null case Jiri mentioned, probably not. We need to serialize all PK values though into a generic form while tracking snapshot progress. This may be a bit of busy work to support all (relevant) PK column types.

@Jiri, I think what would be great for the DDD would be a description of these formats, i.e. where is snapshot progress stored (connector offsets, the signal table?) and how does this look like?
 

Best wishes,
Philip

Best,

--Gunnar
 

On Mon, 26 Apr 2021 at 05:55, jiri.p...@gmail.com <jiri.p...@gmail.com> wrote:
Hi,


In this document we propose implementation of two features

1) Ad-hoc snapshotting that would allow the user to trigger a snapshot of table(s) anytime during the streaming
2) Incremental snapshotting that would enable the user to take the snapshot of a table during the streaming (this is an off-spring of DBLog implementation)

Please take a look at the proposal, any comments are welcome and would be incoirporated either directly in the first cur or in a follow-up steps.

Thanks in advance!

Jiri Pechanec

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/cc539d94-d4ad-467b-ac12-74f888827a99n%40googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Gunnar Morling

unread,
Apr 28, 2021, 5:48:46 AM4/28/21
to debezium
Hey Jiri,

As discussed, I'd suggest to add another section for the DDD, "Testing Strategy".

--Gunnar

Ekaterina Galieva

unread,
Apr 28, 2021, 2:15:31 PM4/28/21
to debezium

Hi, 

I have a few questions/thoughts about the DDD-3:

1. How to handle schema changes during the incremental snapshot? 

Will it be possible to detect the schema change from the binlog stream and re-snapshot the schemas for the next chunk?

What if the schema change affects PK, will the incremental snapshot fail or restart?

2. How to scope the snapshot to all tables/list of tables/a range of primary keys/maybe even a specific query?

3. How can the snapshotting be paused and resumed without pausing the binlog stream from the connector?

4. Where the incremental snapshotting progress will be stored (Kafka topic, Zookeeper or something else)?

5. What will be the format of the incremental snapshotting progress log?

6. Will the incremental snapshotting resume from where it stopped if the connector gets restarted?

7. DDD-3 says parallelization of snapshotting is a non-goal. Could the parallelization be added later or there are some blockers for it in the current design? If there are no blocker, what needs to be done to make it simpler to add it in the future (id for each chunk, low and high watermark containing the chunk id)?

8. Is it possible to send low and high watermark to the binlog stream without having to write to a database, so that Debezium would be able to keep being connected to a read-only replica?

--Kate

jiri.p...@gmail.com

unread,
Apr 29, 2021, 12:02:39 AM4/29/21
to debezium
Hi Kate,

all the questions are great so please keep them coming as most of them are actionable. PLease see answers inline

On Wednesday, April 28, 2021 at 8:15:31 PM UTC+2 katerina...@gmail.com wrote:

Hi, 

I have a few questions/thoughts about the DDD-3:

1. How to handle schema changes during the incremental snapshot? 

Will it be possible to detect the schema change from the binlog stream and re-snapshot the schemas for the next chunk?

For the first release we expect unchanged schema but this could be implemented in the next versions. 

What if the schema change affects PK, will the incremental snapshot fail or restart?

That's again follow up - I suppose we should be able to restart the table snapshot from scratch.

2. How to scope the snapshot to all tables/list of tables/a range of primary keys/maybe even a specific query?

Currently you can enumerate the tables you want to snapshot. Snapshotting of ALL tables would require more effort as we would need to monitor if there is a new table appearing during the snapshot.
For a table range, not yet but I think in future we would allow user to provide a fragment of WHERE condition that would be applied to the window query. This would effectivelly allow to take a subset snapshot. 

3. How can the snapshotting be paused and resumed without pausing the binlog stream from the connector?

This is not implemented yet but it is pretty low-hanging fruit as we can provide two new signals for that. Tis could be delivered even in 1.6. 

4. Where the incremental snapshotting progress will be stored (Kafka topic, Zookeeper or something else)?

The progress will be stored in Kafka Connect Offsets. 

5. What will be the format of the incremental snapshotting progress log?

I am not sure I understand the question. Do you mean application logging or storage logging? 

6. Will the incremental snapshotting resume from where it stopped if the connector gets restarted?

It will resume after the last completed window so you need to expect duplicates. 

7. DDD-3 says parallelization of snapshotting is a non-goal. Could the parallelization be added later or there are some blockers for it in the current design? If there are no blocker, what needs to be done to make it simpler to add it in the future (id for each chunk, low and high watermark containing the chunk id)?

We need first to gather some experiences with this design and decide if and how the paralelization could be achieved. 

8. Is it possible to send low and high watermark to the binlog stream without having to write to a database, so that Debezium would be able to keep being connected to a read-only replica?

Unfortunately no :-(. We need to get the watermarks directly to the transaction log so we get proper ordering. 

Ekaterina Galieva

unread,
Apr 29, 2021, 9:31:49 AM4/29/21
to debezium
Hi Jiri,
thanks for the quick reply!

>> 2. How to scope the snapshot to all tables/list of tables

> Currently you can enumerate the tables you want to snapshot. Snapshotting of ALL tables would require more effort as we would need to monitor if there is a new table appearing during the snapshot.

Sorry, my question sounds confusing, what I meant by all tables is the list of the tables the connector is reading binlogs from. For example, the connector is configured to read binlogs from 10 tables. Will I be able to incrementally snapshot only two of those tables while still getting the binlogs from the rest of the 8 tables too?


>> 5. What will be the format of the incremental snapshotting progress log?

> I am not sure I understand the question. Do you mean application logging or storage logging? 

This was a follow-up question to 4. Where the incremental snapshotting progress will be stored (Kafka topic, Zookeeper or something else)?

What will be the content of the incremental snapshotting progress message? How to make it flexible enough to support the incremental snapshotting parallelization in the future?

The format will be different from Kafka Connect Offsets, will it break anything?

Ekaterina Galieva

unread,
Apr 29, 2021, 11:57:25 PM4/29/21
to debezium
I annoyed Aaron with questions today about what query can be used as a replacement for the writing to the watermark table.
It sounds like `SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged)` will work.
Watermarks then become of GtidSet type. For detecting watermarks in the event processing loop GtidSet needs a contains method, which will return true if a gtid is inside the GtidSet. And the event processing loop shouldn't skip watermark events.

This change removes unique identifiers from low and high watermarks. I think low and high watermarks aren't needed in Snapshot Offset message, because if the chuck processing succeeded watermarks are no longer needed, if processing failed in the middle of the chunk, the chunk and its watermarks are lost and it will be selected again after the processing resumes.
For parallelization low and high watermarks don't need unique identifiers, but it may be nice to generate an id for the chunk and wrap the chunk and its watermarks into a single object.

Sequential Algorithm modification:
       (1) pause log event processing
       (2) GtidSet lwGtidSet := SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged)
       (3) chunk := select next chunk from table
       (4) GtidSet hwGtidSet := SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged)
       (5) resume log event processing
       inwindow := 𝑓𝑎𝑙𝑠𝑒
       // other steps of event processing loop
       while true do
           e := next event from changelog
           append e to outputbuffer
           if not inwindow then
               if not lwGtidSet.contains(e.gtid) //reached the low watermark
                   inwindow := 𝑡𝑟𝑢𝑒
           else
               if hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet
                   if chunk contains e.key then
                       remove e.key from chunk
               else //reached the high watermark
                   for each row in chunk do
                       append row to outputbuffer
                   commit max chunk's max table-PK
           // other steps of event processing loop

The same Algorithm modification with two concurrent chunks:

       (1) pause log event processing
       chunks = HashMap [
           {   chunkId := uuid() } - 
               {
                   (2) GtidSet lwGtidSet = SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged)
                   (3) chunk := select next chunk from table
                   (4) GtidSet hwGtidSet = SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged)
               },
          {   chunkId := uuid() } - 
               {
                   (2) GtidSet lwGtidSet = SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed,@@GLOBAL.gtid_purged)
                   (3) chunk := select next chunk from table
                   (4) GtidSet hwGtidSet = SELECT GTID_SUBTRACT(@@GLOBAL.gtid_executed,@@GLOBAL.gtid_purged)
               }
       ]
       (5) resume log event processing
       inwindow := HashMap [chunkId - false, chunkId - false]
       // other steps of event processing loop
        while true do
           e := next event from changelog
           append e to outputbuffer
           for (chunkKey, chunkValue) in chunks
               if not inwindow[chunkKey] then
                   if not chunkValue.lwGtidSet.contains(e.gtid) //reached the low watermark
                       inwindow[chunkKey] := 𝑡𝑟𝑢𝑒
               else
                   if chunkValue.hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet
                       if chunkValue.chunk contains e.key then
                           remove e.key from chunkValue.chunk
                   else //reached the high watermark
                       for each row in chunkValue.chunk do
                           append row to outputbuffer
                       commit chunkValue.chunk's max table-PK + range (depends on what it's parallelized by)
                       chunks.remove(chunkKey)
                       inwindow.remove(chunkKey)
               // other steps of event processing loop


If it's more convenient to use a MySQL table for the incremental snapshotting API implementation, could it use a separate MySQL connection?
In this case on our side it can be configured to connect to a separate MySQL instance with write permissions.

-- Kate

jiri.p...@gmail.com

unread,
May 5, 2021, 7:57:17 AM5/5/21
to debezium
Hi Kate,

let's put aside the parallel version for now.

Looking at the proposed sequential one I'd say it could definitely work.
Regarding the different MySQL instance I don't think this would work or is your intention to replicate the writeable instance into the db instance from which the streaming is executed?

J.

Ekaterina Galieva

unread,
May 5, 2021, 9:08:13 AM5/5/21
to debezium
Hi Jiri,

> Regarding the different MySQL instance I don't think this would work or is your intention to replicate the writeable instance into the db instance from which the streaming is executed?

The separate MySQL connection question is about API only. We discussed the alternatives to how to schedule/pause an incremental snapshot or set a scope (a subset of tables, query). One of the options was to use MySQL table for it and an alternative is to use a Kafka topic. In our case Kafka topic will work as well as MySQL table if it's in a different MySQL instance.

-- Kate

jiri.p...@gmail.com

unread,
May 6, 2021, 2:03:27 AM5/6/21
to debezium
I see, not as of now, sorry. But thinking about that it is still out-of-bands so Kafka topic might be better for this purposes.

J.

Leonard Xu

unread,
May 26, 2021, 7:35:43 AM5/26/21
to debezium
Hi, katerina

After read your  Sequential Algorithm modification version, I'm curious could we use binlog position instead of executed gtid set to mark the low watermark and high watermark, I found debezium used signal table to record the low/high watermark, could you explain more?

Thanks,
Leonard

Ekaterina Galieva

unread,
May 26, 2021, 10:45:19 AM5/26/21
to debezium
Hi Leonard,

Using binlog position from the signal table events as it is currently implemented is perfectly fine. In my case, the goal is to keep the read-only permissions to MySQL for Debezium. I'm working on the alternative option to get high/low watermarks, as well as to start an incremental snapshot.

Leonard Xu

unread,
May 26, 2021, 9:27:10 PM5/26/21
to debezium
Hi, katerina

Thanks for your information, it's helpful to me. 

Maybe my description isn't very clear, I mean could we execute `SHOW MASTER STATUS`  to get low watermark before query a chunk and after the query finished we execute above command again to get the high watermark. This way is similar to your proposal and which can achieve the read-only permission case even the users didn't enable GTID mode. I didn't know is there any bad case for this way, have you considered this way?
 
Best,
Leonard

Ekaterina Galieva

unread,
May 26, 2021, 9:57:13 PM5/26/21
to debezium
Hi Leonard,

I didn't know the difference in requirements for GTID mode those two queries have. In this case, we can use `Executed_Gtid_Set` from `SHOW MASTER STATUS` instead of @@GLOBAL.gtid_executed.

Leonard Xu

unread,
May 26, 2021, 10:11:51 PM5/26/21
to debe...@googlegroups.com
Hi, Katerina

Thanks for your quick response, I got it now.

Best,
Leonard

You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/JS3LjfE3WQo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/7410bec7-bc32-4f52-8573-0866ec593976n%40googlegroups.com.

jiri.p...@gmail.com

unread,
May 27, 2021, 11:07:59 AM5/27/21
to debezium
Hi,

is this something you plan to conttribute to Debezium itself? If yes, could we prepare to discuss the design and the necessary refacotroing in current code?

Thanks

J.

Ekaterina Galieva

unread,
May 27, 2021, 11:14:38 AM5/27/21
to debezium
Hi,

Yes sure, I'm going to prepare a draft PR and we can iterate from there

Leonard Xu

unread,
May 28, 2021, 2:56:15 AM5/28/21
to debezium
Hi,Jiri

I'd like to contribute to Debezium community too. 

I'm active in Flink community and we developed Flink-CDC (https://github.com/ververica/flink-cdc-connectors)  based on Debezium.

Offering a non-lock and parallel-snapshot design which also supports checkpoint(pause and resume in chunk level) in Flink-CDC is on my schedule, 
this is very related to the topic we discussed in current thread, there should be some thing we can contribute to Debezium community.

Please let me know if  there is any need for me.

Best,
Leonard


Gunnar Morling

unread,
May 28, 2021, 12:03:45 PM5/28/21
to debe...@googlegroups.com
Excellent, thanks a lot, Kate; Looking forward to this!

--Gunnar


Leonard Xu

unread,
Aug 11, 2021, 9:33:36 AM8/11/21
to debezium
Hello, everyone

We released Flink-CDC 2.0 today[1],  it brings new MySQL-CDC connector which offers parallel reading, lock-free and checkpoint feature.
I simply tested it using 8 parallelism, the test table `customer` has 65,000,000 records which comes from TPC-DS data set,
It takes 13 minutes, compared the old single parallelism implementation which takes 89 minutes, we gain about 6.8x improvement.

The design of CDC 2.0 is inspired by DBLog paper, Debezium DDD-3 and the discussion here, it also adopts the new Source design from Flink FLIP-27[2] to manage the state and chunks. 

I want to thanks all involved in this thread, especially @Kate @Jiri, thanks again. Flink CDC is an open source project under Apache License 2.0,  welcome to try it.

Best,
Leonard
Reply all
Reply to author
Forward
0 new messages