Handling duplicates in the database history topic

372 views
Skip to first unread message

Sergei Morozov

unread,
Jun 21, 2022, 5:13:33 PM6/21/22
to debezium
Hi Community,

My team stumbled upon an issue where the internal Debezium schema is corrupted due to the database history topic containing duplicates.

Here's an example of the messages in the topic:
{"f":"mysql-bin-changelog.276589","p":88563795,"d":"CREATE TABLE tbl_cleaned LIKE tbl"}
{"f":"mysql-bin-changelog.276599","p":7066,"d":"RENAME TABLE tbl TO tbl_backup"}
{"f":"mysql-bin-changelog.276599","p":7066,"d":"RENAME TABLE tbl_cleaned TO tbl"}
{"f":"mysql-bin-changelog.276599","p":14390,"d":"DROP TABLE `db_redacted`.`tbl_backup`"}
{"f":"mysql-bin-changelog.276599","p":7066,"d":"RENAME TABLE tbl TO tbl_backup"} // duplicate
{"f":"mysql-bin-changelog.276599","p":7066,"d":"RENAME TABLE tbl_cleaned TO tbl"} // duplicate
{"f":"mysql-bin-changelog.276599","p":14390,"d":"DROP TABLE `db_redacted`.`tbl_backup`"} // duplicate
{"f":"mysql-bin-changelog.281865","p":40305,"d":"DROP TABLE IF EXISTS `db_redacted`.`tbl_cleaned`"}

The keys in the above documents translate to the following in the actual message schema:
{
  "position": {
    "file": "...", // f
    "pos": ...     // p
  },
  "ddl": "...",    // d
}

If there were no duplicates, the statement on line 3 would create table "tbl", so all the CDC events would be interpreted properly. But what happens instead is:
  1. The duplicate statement on line 5 renames "tbl" to "tbl_backup", so after it "tbl" doesn't exist in the schema.
  2. The duplicate statement on line 6 attempts to rename "tbl_cleaned" to "tbl" but "tbl_cleaned" was dropped by the statement on line 3, so nothing happens.
As a result, table "tbl" doesn't exist in the internal schema.

I have a few questions in this regard:
  1. Should Debezium be capable of handling such situations by design? It looks like it should since in my understanding the messages are produced into the database history topic at least once but not exactly once.
  2. How could one deduplicate the statements? As you can see, some of the statements have the same binlog filename and position as they likely originate from a single statement like "RENAME TABLE tbl TO tbl_backup, tbl_cleaned TO tbl". So far, it looks like we need to consider not only the filename and position but also the statement itself for deduplication but at the same time, Debezium might produce some additional sequence numbers for the statements from the same event. This would allow for better deduplication.
  3. Should the attempt to rename a non-existing table fail? If it was the case, it might be easier to detect the root cause of the issues like this.
Regards,
Sergei

Chris Cranford

unread,
Jun 22, 2022, 9:09:13 AM6/22/22
to debe...@googlegroups.com, Sergei Morozov
Hi Sergei -

Is it possible that connector was stopped, offsets manually adjusted and the connector restarted?  That most definitely would be one reason as to why the schema history topic would appear as it does.  Since we set the offsets to where the client should read from at the start of the streaming process, I don't see a reason why there would be duplicate events like this emitted during the streaming phase AFAICT, so manual offset manipulation seems like the obvious reason.  Perhaps Jiri can spot something as I believe he's more familiar with the MySQL code base.

Specifically to question (3), I think the question is what information is in the in-memory relational model for all use cases here.  More to the point, what information is there for the source table of the rename when you only capture DDL for tables you are interested in rather than all tables.  Would there be situations where this could create a false positive failure use case if we actually failed if the call to "Tables#renameTable" with a null return value threw a failure.

CC
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/d7d724f6-0774-4d9d-896e-6ded3ea1239cn%40googlegroups.com.

Sergei Morozov

unread,
Jun 22, 2022, 2:41:57 PM6/22/22
to debezium
Hi Chris,


> Is it possible that connector was stopped, offsets manually adjusted and the connector restarted? That most definitely would be one reason as to why the schema history topic would appear as it does. Since we set the offsets to where the client should read from at the start of the streaming process, I don't see a reason why there would be duplicate events like this emitted during the streaming phase AFAICT, so manual offset manipulation seems like the obvious reason.

To my knowledge, the connector didn't get its offsets manually adjusted, but as the analysis of the schema history shows, we've had ~300 cases of the duplicates like in the example over the course of the past few months. They just didn't lead to a corrupted in-memory schema up until now.

We were able to induce another series of the duplicates just yesterday under the following circumstances:
  1. We had the Kafka Connect worker's memory limit set tight enough to suffice for the regular streaming rate.
  2. The connector was stopped for a few days while we were investigating the issue.
  3. We had the database history topic cleaned up from duplicates and recreated.
  4. After the connector had been restarted, it successfully recovered history, started consuming the backlog at a higher than normal rate, and failed with an OOM exception.
At this point, we observed that the binlog position committed to the connector offsets was slightly less than the binlog position of the last message in the database history. As far as I understand the logic of recovering schema history, the connector will read the topic up until the committed offset, ignore the remainder, and then start streaming. As a result, it will consume DDLs after the committed offset from the binlog and produce them to the history topic again (those will be duplicates).

In my understanding, Debezium produces messages to the database history topic synchronously with updating the internal schema, but Kafka Connect produces offsets asynchronously. So the position in the connector offset is eventually consistent with the one in the database history but this event of consistency may never occur if the connector crashes.

> I think the question is what information is in the in-memory relational model for all use cases here.  More to the point, what information is there for the source table of the rename when you only capture DDL for tables you are interested in rather than all tables.  Would there be situations where this could create a false positive failure use case if we actually failed if the call to "Tables#renameTable" with a null return value threw a failure.

These are fair questions but I cannot answer them at the moment. We can table them for a later time and focus on addressing the primary issue for now.

Regards,
Sergei

jiri.p...@gmail.com

unread,
Jun 29, 2022, 12:16:42 PM6/29/22
to debezium
Hi,

can we record the highest postition we have seen and processed and if a smaller one arrives then we'd just skip such messages?

J.

Sergei Morozov

unread,
Jun 29, 2022, 12:49:31 PM6/29/22
to debezium
Yeah, I believe that should work. We'll give it a try.

Thanks!

Gunnar Morling

unread,
Jun 30, 2022, 4:10:04 AM6/30/22
to debezium
> can we record the highest postition we have seen and processed

You mean during history recovery, right? How would that work though in the light of those (different) events which share the same source offset, as in Sergei's original example? Adding an ever increasing sequence value when recording history events may be a way out?

--Gunnar

Sergei Morozov

unread,
Jun 30, 2022, 3:10:46 PM6/30/22
to debezium
> You mean during history recovery, right?

This is my understanding.

> How would that work though in the light of those (different) events which share the same source offset, as in Sergei's original example? Adding an ever increasing sequence value when recording history events may be a way out?

Yeah, this is necessary for processing multiple schema changes originating from a single binlog event.

By an ever-increasing value, do you mean a globally unique value or scoped to a given binlog event? I believe the latter is sufficient and would be easier to implement. It would be similar to the "row" property of the CDC events.
Reply all
Reply to author
Forward
0 new messages