Deleting records if the destination table is partitioned

61 views
Skip to first unread message

Vladislav P

unread,
Oct 22, 2025, 3:32:37 AM (2 days ago) Oct 22
to debezium
Transferring data from Oracle to Postgre. Deleting records works for regular tables, everything is checked. But now there are tables with partitioning and deleting them does not work. There are 2 fields specified for PK, because partitioning tables should include 2 fields for PK. Connectors: ### source
POST http://{{host}}:{{port}}/connectors/
Content-Type: application/json

{
"name": "source-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "{{databaseHost}}",
"database.port": "{{databasePort}}",
"database.user": "{{databaseUser}}",
"database.password": "{{databasePassword}}",
"database.dbname": "{{databaseName}}",
"table.include.list": "WMS.TBL_SH_ARTTASK",
"column.include.list": "WMS\\.TBL_SH_ARTTASK\\.(ID_SHARTTASK|ID_SHTASK|ID_OPART|LASTDATE)",
"topic.prefix": "{{topicPrefix}}",
"database.server.name": "{{topicPrefix}}",
"schema.history.internal.kafka.topic": "dbz_oracle_wpms_history",
"schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
"log.mining.strategy": "hybrid",
"log.mining.query.filter.mode": "in",

"message.key.columns": "WMS.TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;"
}
}  
### sink
POST http://{{host}}:{{port}}/connectors/
Content-Type: application/json

{
"name": "sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "5",
"connection.url": "{{targetDbUrl}}",
"connection.username": "{{targetDbUsername}}",
"connection.password": "{{targetDbPassword}}",
"topics.regex": "{{topicPrefix}}.WMS.TBL_SH_ARTTASK",
"table.name.format": "${source.schema}.${source.table}",

"delete.enabled": "true",
"primary.key.mode": "record_key",
"primary.key.fields": "ID_SHARTTASK,LASTDATE",
"insert.mode": "upsert"
}
} When deleting an entry in the logs, you can see: 
2025-10-22 06:32:49,895 WARN || Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic 'dbserver5.WMS.TBL_SH_ARTTASK', partition '0', offset '268458' [io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy]
2025-10-22 06:32:49,895 WARN || Ignored to write record from topic 'dbserver5.WMS.TBL_SH_ARTTASK' partition '0' offset '268458'. No resolvable table name [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-10-22 06:32:53,953 INFO || WorkerSourceTask{id=wpms-rcdb-source-connector-read-avro5-0} Committing offsets for 1 acknowledged messages [org.apache.kafka.connect.runtime.WorkerSourceTask] 

But deleting in regular tables works.

Kafka message:
KEY: {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID_SHARTTASK"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"LASTDATE"}],"optional":false,"name":"dbserver5.WMS.TBL_SH_ARTTASK.Key"},"payload":{"ID_SHARTTASK":1767405703,"LASTDATE":1758106329000}} VALUE: empty
HEADERS: {}
  

Chris Cranford

unread,
Oct 22, 2025, 3:38:31 AM (2 days ago) Oct 22
to debe...@googlegroups.com
Hi, what version of Oracle and JDBC sink connectors are you using?
   --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/52e402ee-7523-4d28-827a-0cbd88854dbdn%40googlegroups.com.

Vladislav P

unread,
Oct 22, 2025, 3:46:13 AM (2 days ago) Oct 22
to debezium
Hi.
Oracle:  Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
JDBC sink:  io.debezium.connector.jdbc.JdbcSinkConnector
The partitioned tables are in the Postgres database.
среда, 22 октября 2025 г. в 11:38:31 UTC+4, Chris Cranford:

Chris Cranford

unread,
Oct 22, 2025, 5:39:46 AM (2 days ago) Oct 22
to debe...@googlegroups.com
Hi, so let me make sure I understand

    - On the Oracle side you have a table with a single PK column called ID_SHARTTASK
    - On the PostgreSQL side you have a partitioned table with a composite PK key based on ID_SHARTTASK and LASTDATE.

If that's correct, then you should set your primary key mappings as

    "primary.key.mode": "record_value",
    "primary.key.fields": "ID_SHARTTASK,LASTDATE"

Since LASTDATE is not a primary key column on the Oracle side, you cannot use "record_key" because that field isn't in the key, it's only a field in the value.

If I have misunderstood, can you please describe exactly how the PK is defined in Oracle and PostgreSQL.

Thanks,
-cc

Vladislav P

unread,
Oct 22, 2025, 5:55:01 AM (2 days ago) Oct 22
to debezium
Yes, you absolutely understood me correctly.
But if you set "primary.key.mode": "record_value", then you won't be able to set "delete.enabled": "true", but I want the records to be deleted.


I thought that the string (source-connector) "message.key.columns": "WMS. TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;" adds columns to the key and allows us to use them when deleting a record via "primary.key.mode": "record_key".
Then how do I delete a record from a partitioned Postgre table if it was deleted from Oracle?

среда, 22 октября 2025 г. в 13:39:46 UTC+4, Chris Cranford:

Chris Cranford

unread,
Oct 22, 2025, 6:16:08 AM (2 days ago) Oct 22
to debe...@googlegroups.com
Hi,

That's a good point, and that's likely something we should revisit. I don't remember exactly why `record_key` is mandatory and why `record_value` couldn't also be used. I'll raise a Jira for that.

In the meantime, I wonder if you could flattened the event structure using the `ExtractNewRecordState` and then apply the standard Kafka transform `ValueToKey` to raise the `LASTDATE` field into the key. In that case, you should be able to set `primary.key.mode` as `record_key` and you can drop `primary.key.fields` entirely. That should work the same way as I had described but also gives you the ability to support deletes as a workaround.

Thanks,
-cc

Vladislav P

unread,
Oct 22, 2025, 7:40:58 AM (2 days ago) Oct 22
to debezium
About "ExtractNewRecordState" and "ValueToKey". I just tried to do it. But I get errors because of "ValueToKey" - "Field does not exist: ID_SHARTTASK".

"transforms": "unwrap,insertKey",
 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
 "transforms.unwrap.drop.tombstones": "false",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields": "ID_SHARTTASK,LASTDATE".

 
After the "ExtractNewRecordState" in kafka, the columns were in the payload field: { ID_SHARTTASK:.......}.

Isn't it enough that the fields just fit into the kafka message key? Just when I specify "message.key.columns": "WMS.TBL_SH_ARTTASK:ID_SHARTTASK,LASTDATE;" in the source connector, these columns are displayed in the key.

Kafka message:
KEY: {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID_SHARTTASK"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"LASTDATE"}],"optional":false,"name":"dbserver5.WMS.TBL_SH_ARTTASK.Key"},"payload":{"ID_SHARTTASK":1767405703,"LASTDATE":1758106329000}} VALUE: empty
HEADERS: {}

I thought that the message.key.columns field was designed to specify multiple columns for the primary key. But then why doesn't the deletion work?

среда, 22 октября 2025 г. в 14:16:08 UTC+4, Chris Cranford:

Chris Cranford

unread,
Oct 23, 2025, 1:07:03 AM (yesterday) Oct 23
to debe...@googlegroups.com
Hi -

You' right, I was too quick and didn't consider the fact that OOTB the `ExtractNewRecordState` on deletes either:

    - Drops the tombstone and treats the delete as a tombstone (default behavior)
    - Rewrites the delete event by adding the before state and `__deleted: true` to the value when setting `delete.tombstone.handling.mode` as `rewrite`.

Would you be able to set `message.key.columns` in the source connector configuration and specify for this table that its key columns as `ID_SHARTASK,LASTDATE` ? 

With `message.key.columns` set, the sink could just use `primary.key.mode` set to `record_key` and it would just work. If you are unable to use `message.key.columns`, then I'm afraid because Kafka's KIP-821 is still unimplemented, that a custom transformation would be necessary to rewrite the event's key.

My apologies for the confusion.
-cc

Vladislav P

unread,
Oct 23, 2025, 7:42:34 AM (21 hours ago) Oct 23
to debezium
Hi.

I managed to set up the deletion of records from the partitioned Postgre table.

Apparently, this is some kind of error in the receiver connector. Using an image (debezium/connect:3.0.0.Final)
The error was in pk.mode and pk fields.fields

That's exactly what I wrote earlier (it doesn't work):
{
"name": "shells",
"configuration": {
"connector.class ": "Io.debezium.connector.using JDBC.JdbcSinkConnector",
"tasks.max": "1",
"connection.URL": "{{targetDbUrl}}",

"connection.username": "{{targetDbUsername}}",
"connection.password": "{{targetDbPassword}}",
"themes.regular expression": "{{topicPrefix}}.Sus.TBL_SH_ARTTASK",
"table name.the format is ": "${source schema}.${source table}",

"delete.enabled": "true",
"primary.key.mode":
"record key", "primary key fields": "ID_SHARTTASK,LAST DATE",
"insert.mode": "update"
}


This is how I write it now (it works).:
{
"name": "recipient",
"configuration": {

"connector.class ": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "5",
"connection.url": "{{targetDbUrl}}",
"connection.username": "{{targetDbUsername}}",
"connection.password": "{{targetDbPassword}}",
"topics.regex": "{{topicPrefix}}.WMS.TBL_SH_ARTTASK",
"table.name .the format is ": "${source code.scheme}.${source table}",

"delete.enabled": "true",
"primary key mode":
"write key", "primary key fields": "ID_SHARTTASK, LAST DATE",
"insert.mode": "insert",
"pk.mode": "write key",
"pk.fields": "ID_SHARTTASK, LAST DATE",
}
No changes in the source-connector, I use "message.key.columns" for several PK.

I hope this helps someone. Thank you all!

четверг, 23 октября 2025 г. в 09:07:03 UTC+4, Chris Cranford:

Chris Cranford

unread,
2:20 AM (2 hours ago) 2:20 AM
to debe...@googlegroups.com
Hi, 

I'm afraid while it is working, it's not working the way you might expect it to be.
  1. The field "primary.key.mode" should have dots/periods between the words, as in your first configuration. In your second configuration since you've omitted the dots/periods, the connector defaults to primary key mode "none".
  2. The first configuration gave you an error because you specified the primary key mode as "record key" (without an underscore), it should have been "record_key".
  3. The field "primary.key.fields" like the primary key mode should have dots/periods between the words, in your first configuration the dots/periods are missing. 
  4. When using "message.key.columns" on the source with "primary.key.mode" set as "record_key", you can omit setting "primary.key.fields" in your case.
  5. In the last configuration "pk.mode" and "pk.fields" are not valid configurations for Debezium, those are for Confluent's JDBC sink.
  6. Lastly, be sure that your "insert.mode" is set correctly. In cases where you want events from the source to be inserted if they don't yet exist and updated if they do, you should use "upsert" as the value.

I'd suggest reviewing the documentation [1] along with my comments above to make sure that the connector is working as you might intend for it to regarding primary keys.

Let us know if you have any other questions.
Chris

[1]: https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-connector-properties
Reply all
Reply to author
Forward
0 new messages