Receiving multiple update records on the same PK using CDC debezium connector

85 views
Skip to first unread message

Prince Parekh

unread,
Mar 3, 2025, 12:14:42 AM3/3/25
to debezium
Hi Team,

For the CDC enabled tables we are receiving multiple updates records for a PK and not able to differentiate between which is the latest record. happing for multiple tables.
There are million of record in each tables.
We need to find a way that each record have a different ts to differentiate between latest records and avoid duplicates for a PK

Below is the .json file parameters which we am using in the connector.

        "value.converter.enhanced.avro.schema.support": "true",
        "value.converter.schemas.enable": "true",
        "config.action.reload": "restart",
        "config.action.reload.interval.ms":"6000",
        "snapshot.mode":"initial",
        "snapshot.fetch.size": "750000",
        "snapshot.max.threads":"10",
        "group.id": "jde_cdc_decimal",
        "decimal.handling.mode": "string",
        "producer.override.batch.size": "200000",
        "producer.override.linger.ms": "10",
        "producer.override.delivery.timeout.ms": "600000",
        "producer.override.request.timeout.ms": "300000",
        "producer.override.compression.type": "lz4",
        "transforms": "flatten",
        "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
        "transforms.flatten.delimiter": "_",
        "database.lock.timeout.ms": "60000",
        "errors.retry.delay.max.ms": "60000",
        "poll.interval.ms": "90000",
        "errors.retry.timeout": "20000",
        "errors.retry.max.delay.ms": "500"

Example:
Have highlighted the TS column which is exactly same.

RECORD 1:
  },
  "source_version": "2.5.4.Final",
  "source_connector": "sqlserver",
  "source_name": "TABLE NAME",
  "source_ts_ms": 1740739122590,
  "source_snapshot": {
    "string": "false"
  },
  "source_db": "SCHEMA_NAME",
  "source_sequence": null,
  "source_schema": "PROD",
  "source_table": "TABLENAME",
  "source_change_lsn": {
    "string": "0072e0af:000c5e40:009d"
  },
  "source_commit_lsn": {
    "string": "0072e0af:000c5fa8:000a"
  },
  "source_event_serial_no": {
    "long": 2
  },
  "op": "u",
  "ts_ms": {
    "long": 1740739126149
  },
  "transaction_id": null,
  "transaction_total_order": null,
  "transaction_data_collection_order": null
}

RECORD 2:

  "source_version": "2.5.4.Final",
  "source_connector": "sqlserver",
  "source_name": "TABLE NAME",
  "source_ts_ms": 1740739122590,
  "source_snapshot": {
    "string": "false"
  },
  "source_db": "SCHEMA_NAME",
  "source_sequence": null,
  "source_schema": "PROD",
  "source_table": " TABLE NAME  ",
  "source_change_lsn": {
    "string": "0072e0af:000c5f30:0012"
  },
  "source_commit_lsn": {
    "string": "0072e0af:000c5fa8:000a"
  },
  "source_event_serial_no": {
    "long": 2
  },
  "op": "u",
  "ts_ms": {
    "long": 1740739126149
  },
  "transaction_id": null,
  "transaction_total_order": null,
  "transaction_data_collection_order": null
}

jiri.p...@gmail.com

unread,
Mar 3, 2025, 1:40:01 AM3/3/25
to debezium
Hi,

timestamps are never rilable for any connetor/database. As the changes have different commit and changle LSN then you can choose the one with the highest value.

Jiri

Prince Parekh

unread,
Mar 3, 2025, 2:12:22 AM3/3/25
to debezium

Hi Jiri,
Thanks for the quick response.

Is there any way/Parameter in cdc debezium sql server connector where we can avoid duplicates. and just fetch the latest records from the cdc table from the source itself, This would avoid same records being pulled multiple time.

Attached a real example thats happening.

Out of the 4 records only 2 records are where we have updates and the other two are the exact duplicates updates which are captured.
sample_cdc.png

jiri.p...@gmail.com

unread,
Mar 3, 2025, 2:44:33 AM3/3/25
to debezium
Hi,


Jiri

Prince Parekh

unread,
Mar 20, 2025, 8:44:49 AM3/20/25
to debezium
Hi Team,

We are receiving Multiple Updates still for a combination of Primary_key ,Transaction metadata and not able to differentiate between the latest record which was updated as every other value seems to be same.

Parameter used are.

        "config.action.reload": "restart",
        "config.action.reload.interval.ms":"6000",
        "snapshot.mode":"initial",
        "snapshot.fetch.size": "100000",
        "snapshot.max.threads":"8",
        "group.id": "jde__cdc_decimal",
        "decimal.handling.mode": "string",
        "producer.override.batch.size": "2000",
        "producer.override.linger.ms": "10",

        "producer.override.compression.type": "lz4",
        "transforms": "flatten",
        "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
        "transforms.flatten.delimiter": "_",
        "database.lock.timeout.ms": "60000",
        "errors.retry.delay.max.ms": "60000",
        "poll.interval.ms": "90000",
        "errors.retry.timeout": "20000",
        "errors.retry.max.delay.ms": "500",
        "producer.override.enable.idempotence": "true",
        "producer.override.acks": "all",
        "producer.override.retries": "2147483647",
        "producer.override.max.in.flight.requests.per.connection": "1",
        "provide.transaction.metadata": "true"
record_cont..png
records.png

Chris Cranford

unread,
Mar 20, 2025, 12:03:14 PM3/20/25
to debe...@googlegroups.com
Hi,

Based on the configuration you shared, you did not add neither of the two transformations that Jiri suggested before the `Flattened` transformation. The `transforms` property should be supplied like this:

    "transforms": "changes,filter,flatten",
    "transforms.changes.type": "io.debezium.transforms.ExtractChangedRecordState",
    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",

You need to make sure to add all the supplemental configurations for all 3 transformations, as I have omitted that for brevity.
See the documentation links Jiri shared earlier for details.

If that doesn't work for you, then please share an updated configuration.

Thanks,
-cc
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/f4907861-8f20-41f2-840d-e803c4868d9en%40googlegroups.com.

Prince Parekh

unread,
Mar 21, 2025, 2:10:57 AM3/21/25
to debezium
Hi Chris,

Thank for the quick response, I can Try that but I think its okay if we dont have intermediate update records.

Is there any way we can use to just skip some of the updates on the sql server and grab the latest record which would probably avoid such records being transformed.
We tried increasing the polling interval but that did not work for us.

jiri.p...@gmail.com

unread,
Mar 21, 2025, 2:54:00 AM3/21/25
to debezium
Hi,

ther eis no way how to avoid it. This is basic CDC feature - to capture all changes. You'd need to have a Kafka Streams app that would consolidate all changes per tx.

Jiri

Prince Parekh

unread,
Mar 21, 2025, 12:03:59 PM3/21/25
to debezium

Can the compaction work in this case?
 "cleanup.policy": "compact"

Chris Cranford

unread,
Mar 21, 2025, 1:10:23 PM3/21/25
to debe...@googlegroups.com
Hi -

The technical answer is it depends, but the short answer is no.

When compaction runs, it only operates on closed log segments. These are segments that are no longer active and being written to by Kafka. For these closed segments, if multiple events exist for the same PK, the compaction process will retain only the last event for that PK that was in the closed (non-active) segments.  If you have no events in the active log segments for that PK, then post compaction there would only be one event for the PK.  But if you have any events in the active log segment for the same PK, then multiple events for the PK will continue to exist.  That's because compaction ignores active log segments.

So no, you cannot reliably depend on compaction to guarantee you only have one event in the topic for a given PK.

Hope that helps.
-cc

Prince Parekh

unread,
Apr 29, 2025, 11:35:46 AM4/29/25
to debezium
Hello Chris,
I was exploring and found that  source_change_lsn and source_commit_lsn are the column we can relay on to fetch the latest sequence of record for a combination of a primary key as a tiebreaker. As we are receiving update in HH:MM:SS format.

Is the understanding correct that we can also relay on this two column to fetch the latest updates.

Chris Cranford

unread,
Apr 29, 2025, 1:06:53 PM4/29/25
to debe...@googlegroups.com
Hi -

Based on my understanding from this article [1], yes all changes within the same transaction will have the same commit_lsn (start_lsn) while each change within that transaction will have a unique change_lsn (seqval), which represents the sequence for the change within the transaction. Jiri can correct me if I am mistaken in my assumption.

Thanks,
-cc

[1]: https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver16
Reply all
Reply to author
Forward
0 new messages