Deleting records if the destination table is partitioned

46 views
Skip to first unread message

Vladislav P

unread,
3:32 AM (11 hours ago) 3:32 AM
to debezium
Transferring data from Oracle to Postgre. Deleting records works for regular tables, everything is checked. But now there are tables with partitioning and deleting them does not work. There are 2 fields specified for PK, because partitioning tables should include 2 fields for PK. Connectors: ### source
POST http://{{host}}:{{port}}/connectors/
Content-Type: application/json

{
"name": "source-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "{{databaseHost}}",
"database.port": "{{databasePort}}",
"database.user": "{{databaseUser}}",
"database.password": "{{databasePassword}}",
"database.dbname": "{{databaseName}}",
"table.include.list": "WMS.TBL_SH_ARTTASK",
"column.include.list": "WMS\\.TBL_SH_ARTTASK\\.(ID_SHARTTASK|ID_SHTASK|ID_OPART|LASTDATE)",
"topic.prefix": "{{topicPrefix}}",
"database.server.name": "{{topicPrefix}}",
"schema.history.internal.kafka.topic": "dbz_oracle_wpms_history",
"schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
"log.mining.strategy": "hybrid",
"log.mining.query.filter.mode": "in",

"message.key.columns": "WMS.TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;"
}
}  
### sink
POST http://{{host}}:{{port}}/connectors/
Content-Type: application/json

{
"name": "sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "5",
"connection.url": "{{targetDbUrl}}",
"connection.username": "{{targetDbUsername}}",
"connection.password": "{{targetDbPassword}}",
"topics.regex": "{{topicPrefix}}.WMS.TBL_SH_ARTTASK",
"table.name.format": "${source.schema}.${source.table}",

"delete.enabled": "true",
"primary.key.mode": "record_key",
"primary.key.fields": "ID_SHARTTASK,LASTDATE",
"insert.mode": "upsert"
}
} When deleting an entry in the logs, you can see: 
2025-10-22 06:32:49,895 WARN || Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic 'dbserver5.WMS.TBL_SH_ARTTASK', partition '0', offset '268458' [io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy]
2025-10-22 06:32:49,895 WARN || Ignored to write record from topic 'dbserver5.WMS.TBL_SH_ARTTASK' partition '0' offset '268458'. No resolvable table name [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-10-22 06:32:53,953 INFO || WorkerSourceTask{id=wpms-rcdb-source-connector-read-avro5-0} Committing offsets for 1 acknowledged messages [org.apache.kafka.connect.runtime.WorkerSourceTask] 

But deleting in regular tables works.

Kafka message:
KEY: {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID_SHARTTASK"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"LASTDATE"}],"optional":false,"name":"dbserver5.WMS.TBL_SH_ARTTASK.Key"},"payload":{"ID_SHARTTASK":1767405703,"LASTDATE":1758106329000}} VALUE: empty
HEADERS: {}
  

Chris Cranford

unread,
3:38 AM (11 hours ago) 3:38 AM
to debe...@googlegroups.com
Hi, what version of Oracle and JDBC sink connectors are you using?
   --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/52e402ee-7523-4d28-827a-0cbd88854dbdn%40googlegroups.com.

Vladislav P

unread,
3:46 AM (11 hours ago) 3:46 AM
to debezium
Hi.
Oracle:  Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
JDBC sink:  io.debezium.connector.jdbc.JdbcSinkConnector
The partitioned tables are in the Postgres database.
среда, 22 октября 2025 г. в 11:38:31 UTC+4, Chris Cranford:

Chris Cranford

unread,
5:39 AM (9 hours ago) 5:39 AM
to debe...@googlegroups.com
Hi, so let me make sure I understand

    - On the Oracle side you have a table with a single PK column called ID_SHARTTASK
    - On the PostgreSQL side you have a partitioned table with a composite PK key based on ID_SHARTTASK and LASTDATE.

If that's correct, then you should set your primary key mappings as

    "primary.key.mode": "record_value",
    "primary.key.fields": "ID_SHARTTASK,LASTDATE"

Since LASTDATE is not a primary key column on the Oracle side, you cannot use "record_key" because that field isn't in the key, it's only a field in the value.

If I have misunderstood, can you please describe exactly how the PK is defined in Oracle and PostgreSQL.

Thanks,
-cc

Vladislav P

unread,
5:55 AM (9 hours ago) 5:55 AM
to debezium
Yes, you absolutely understood me correctly.
But if you set "primary.key.mode": "record_value", then you won't be able to set "delete.enabled": "true", but I want the records to be deleted.


I thought that the string (source-connector) "message.key.columns": "WMS. TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;" adds columns to the key and allows us to use them when deleting a record via "primary.key.mode": "record_key".
Then how do I delete a record from a partitioned Postgre table if it was deleted from Oracle?

среда, 22 октября 2025 г. в 13:39:46 UTC+4, Chris Cranford:

Chris Cranford

unread,
6:16 AM (8 hours ago) 6:16 AM
to debe...@googlegroups.com
Hi,

That's a good point, and that's likely something we should revisit. I don't remember exactly why `record_key` is mandatory and why `record_value` couldn't also be used. I'll raise a Jira for that.

In the meantime, I wonder if you could flattened the event structure using the `ExtractNewRecordState` and then apply the standard Kafka transform `ValueToKey` to raise the `LASTDATE` field into the key. In that case, you should be able to set `primary.key.mode` as `record_key` and you can drop `primary.key.fields` entirely. That should work the same way as I had described but also gives you the ability to support deletes as a workaround.

Thanks,
-cc

Vladislav P

unread,
7:40 AM (7 hours ago) 7:40 AM
to debezium
About "ExtractNewRecordState" and "ValueToKey". I just tried to do it. But I get errors because of "ValueToKey" - "Field does not exist: ID_SHARTTASK".

"transforms": "unwrap,insertKey",
 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
 "transforms.unwrap.drop.tombstones": "false",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields": "ID_SHARTTASK,LASTDATE".

 
After the "ExtractNewRecordState" in kafka, the columns were in the payload field: { ID_SHARTTASK:.......}.

Isn't it enough that the fields just fit into the kafka message key? Just when I specify "message.key.columns": "WMS.TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;" in the source connector, these columns are displayed in the key.

Kafka message:
KEY: {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID_SHARTTASK"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"LASTDATE"}],"optional":false,"name":"dbserver5.WMS.TBL_SH_ARTTASK.Key"},"payload":{"ID_SHARTTASK":1767405703,"LASTDATE":1758106329000}} VALUE: empty
HEADERS: {}

I thought that the message.key.columns field was designed to specify multiple columns for the primary key. But then why doesn't the deletion work?

среда, 22 октября 2025 г. в 14:16:08 UTC+4, Chris Cranford:
Reply all
Reply to author
Forward
0 new messages