Hi Team,
For the CDC enabled tables we are receiving multiple updates records for a PK and not able to differentiate between which is the latest record. happing for multiple tables.
There are million of record in each tables.
We need to find a way that each record have a different ts to differentiate between latest records and avoid duplicates for a PK
Below is the .json file parameters which we am using in the connector.
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.schemas.enable": "true",
"config.action.reload": "restart",
"
config.action.reload.interval.ms":"6000",
"snapshot.mode":"initial",
"snapshot.fetch.size": "750000",
"snapshot.max.threads":"10",
"
group.id": "jde_cdc_decimal",
"decimal.handling.mode": "string",
"producer.override.batch.size": "200000",
"
producer.override.linger.ms": "10",
"
producer.override.delivery.timeout.ms": "600000",
"
producer.override.request.timeout.ms": "300000",
"producer.override.compression.type": "lz4",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_",
"
database.lock.timeout.ms": "60000",
"
errors.retry.delay.max.ms": "60000",
"
poll.interval.ms": "90000",
"errors.retry.timeout": "20000",
"
errors.retry.max.delay.ms": "500"
Example:
Have highlighted the TS column which is exactly same.
RECORD 1:
},
"source_version": "2.5.4.Final",
"source_connector": "sqlserver",
"source_name": "TABLE NAME",
"source_ts_ms": 1740739122590,
"source_snapshot": {
"string": "false"
},
"source_db": "SCHEMA_NAME",
"source_sequence": null,
"source_schema": "PROD",
"source_table": "TABLENAME",
"source_change_lsn": {
"string": "0072e0af:000c5e40:009d"
},
"source_commit_lsn": {
"string": "0072e0af:000c5fa8:000a"
},
"source_event_serial_no": {
"long": 2
},
"op": "u",
"ts_ms": {
"long": 1740739126149
},
"transaction_id": null,
"transaction_total_order": null,
"transaction_data_collection_order": null
}
RECORD 2:
"source_version": "2.5.4.Final",
"source_connector": "sqlserver",
"source_name": "TABLE NAME",
"source_ts_ms": 1740739122590,
"source_snapshot": {
"string": "false"
},
"source_db": "SCHEMA_NAME",
"source_sequence": null,
"source_schema": "PROD",
"source_table": "
TABLE NAME ",
"source_change_lsn": {
"string": "0072e0af:000c5f30:0012"
},
"source_commit_lsn": {
"string": "0072e0af:000c5fa8:000a"
},
"source_event_serial_no": {
"long": 2
},
"op": "u",
"ts_ms": {
"long": 1740739126149
},
"transaction_id": null,
"transaction_total_order": null,
"transaction_data_collection_order": null
}