__debezium.newkey

118 views
Skip to first unread message

Vladislav P

unread,
Nov 6, 2025, 6:02:43 AM (7 days ago) Nov 6
to debezium
Hi.
Can you tell me how to solve the following problem?
We only read  data from snapshot.mode: no_data.
After that, the sink-connector transfers the data from kafka to postgre and we see the following error (the data is not consistent, and some of the records that are being deleted are not in the topic).

Kafka message:
KEY= {"ID": 3892747457  , "LASTDATE":{"long":  1762434042000  }}
VALUE = empty
HEADERS =  {"__debezium.context.connectorName":"oracle","__debezium.context.connectorLogicalName":"dbserver","__debezium.context.taskId":"0","__debezium.newkey":"{\"ID\":3892747457,\"LASTDATE\":1762434042000}"}

Log:
2025-11-06T10:20:23,913 WARN   ||  Ignored to write record from topic 'dbserver.SCHEMA1.TABLE1' partition '0' offset '7357'. No resolvable table name   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-11-06T10:20:23,915 WARN   ||  Ignore this record because it isn't a Debezium record, then cannot resolve a collection name in topic 'dbserver.SCHEMA1.TABLE1', partition '0', offset '7360'   [io.debezium.sink.naming.DefaultCollectionNamingStrategy]
2025-11-06T10:20:23,915 WARN   ||  Ignored to write record from topic 'dbserver.SCHEMA1.TABLE1' partition '0' offset '7360'. No resolvable table name   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-11-06T10:20:23,916 WARN   ||  Ignore this record because it isn't a Debezium record, then cannot resolve a collection name in topic 'dbserver.SCHEMA1.TABLE1', partition '0', offset '7363'   [io.debezium.sink.naming.DefaultCollectionNamingStrategy]
2025-11-06T10:20:23,916 WARN   ||  Ignored to write record from topic 'dbserver.SCHEMA1.TABLE1' partition '0' offset '7363'. No resolvable table name   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-11-06T10:20:23,918 WARN   ||  Ignore this record because it isn't a Debezium record, then cannot resolve a collection name in topic 'dbserver.SCHEMA1.TABLE1', partition '0', offset '7374'   [io.debezium.sink.naming.DefaultCollectionNamingStrategy]
2025-11-06T10:20:23,918 WARN   ||  Ignored to write record from topic 'dbserver.SCHEMA1.TABLE1' partition '0' offset '7374'. No resolvable table name   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-11-06T10:20:23,920 WARN   ||  Ignore this record because it isn't a Debezium record, then cannot resolve a collection name in topic 'dbserver.SCHEMA1.TABLE1', partition '0', offset '7377'   [io.debezium.sink.naming.DefaultCollectionNamingStrategy]
2025-11-06T10:20:23,920 WARN   ||  Ignored to write record from topic 'dbserver.SCHEMA1.TABLE1' partition '0' offset '7377'. No resolvable table name   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2025-11-06T10:20:23,927 ERROR  ||  Failed to process record: No schema defined for value of map field: "__debezium.newkey"   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.DataException: No schema defined for value of map field: "__debezium.newkey"
        at org.apache.kafka.connect.data.ConnectSchema.assertSchemaNotNull(ConnectSchema.java:280) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:269) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:217) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.Struct.put(Struct.java:203) ~[connect-api-4.1.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$8(KafkaDebeziumSinkRecord.java:420) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:420) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:90) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:163) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:629) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:360) ~


-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:262) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:226) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254) ~[connect-runtime-4.1.0.jar:?]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2025-11-06T10:20:23,938 ERROR  ||  JDBC sink connector failure   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.DataException: No schema defined for value of map field: "__debezium.newkey"
        at org.apache.kafka.connect.data.ConnectSchema.assertSchemaNotNull(ConnectSchema.java:280) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:269) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:217) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[connect-api-4.1.0.jar:?]
        at org.apache.kafka.connect.data.Struct.put(Struct.java:203) ~[connect-api-4.1.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$8(KafkaDebeziumSinkRecord.java:420) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:420) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:90) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:163) ~[debezium-connector-jdbc-3.4.0.Alpha1.jar:3.4.0.Alpha1]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:629) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:360) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:262) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:226) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) ~[connect-runtime-4.1.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254) ~[connect-runtime-4.1.0.jar:?]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecu


tor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2025-11-06T10:20:23,939 ERROR  ||  WorkerSinkTask{id=wpms-sink-connector-avro-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure[connect

Chris Cranford

unread,
Nov 6, 2025, 6:07:42 AM (7 days ago) Nov 6
to debe...@googlegroups.com
Hi -

Can you share the source and JDBC sink connector configurations? 
It would seem that perhaps schemas are not enabled at least for headers?

Thanks
-cc
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure[connect --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/0e72f289-7ade-4760-bee9-f1158702ed21n%40googlegroups.com.

Vladislav P

unread,
Nov 6, 2025, 6:29:13 AM (7 days ago) Nov 6
to debezium
source-connector:
{
  "name": "source-connector-avro",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "{{sourceDatabaseHost}}",
    "database.port": "{{sourceDatabasePort}}",
    "database.user": "{{sourceDatabaseUser}}",
    "database.password": "{{sourceDatabasePassword}}",
    "database.dbname": "{{sourceDatabaseName}}",
    "table.include.list": "SCHEMA1.TABLE1",
    "column.include.list": "SCHEMA1\\.TABLE1\\.(ID|LASTDATE)",
    "topic.prefix": "{{topicPrefix}}",
    "database.server.name": "{{topicPrefix}}",
    "schema.history.internal.kafka.topic": "dbz_oracle_wpms_history",
    "schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
    "log.mining.strategy": "hybrid",
    "log.mining.query.filter.mode": "in",

    "message.key.columns": "SCHEMA1.TABLE1:ID,LASTDATE;",

    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.url": "{apicurioRegistryUrl}}",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "key.converter.schemas.enable": "false",
    "key.converter.apicurio.registry.headers.enabled": "false",
    "key.converter.apicurio.registry.as-confluent": "true",
    "key.converter.apicurio.use-id": "contentId",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "value.converter.schemas.enable": "false",
    "value.converter.apicurio.registry.headers.enabled": "false",
    "value.converter.apicurio.registry.as-confluent": "true",
    "value.converter.apicurio.use-id": "contentId",
    "schema.name.adjustment.mode": "avro",

    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "topic.creation.default.cleanup.policy": "delete",

    "snapshot.mode": "no_data",

    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "MERGE INTO WPMS.DEBEZIUM_HEARTBEAT t USING (SELECT 1 id, CURRENT_TIMESTAMP ts FROM dual) s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET t.ts = s.ts WHEN NOT MATCHED THEN INSERT (id, ts) VALUES (s.id, s.ts)"
  }
}


sink-connector:
{
  "name": "sink-connector-avro",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "5",
    "connection.url": "{{targetDbUrl}}",
    "connection.username": "{{targetDbUsername}}",
    "connection.password": "{{targetDbPassword}}",
    "topics.regex": "{{topicPrefix}}\\.SCHEMA1\\.(TABLE1)",
    "table.name.format": "${source.schema}.${source.table}",

    "delete.enabled": "true",
    "primary.key.mode": "record_key",
    "primary.key.fields": "ID,LASTDATE",
    "insert.mode": "upsert",

    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "key.converter.schemas.enable": "false",
    "key.converter.apicurio.registry.headers.enabled": "false",
    "key.converter.apicurio.registry.as-confluent": "true",
    "key.converter.apicurio.use-id": "contentId",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "value.converter.schemas.enable": "false",
    "value.converter.apicurio.registry.headers.enabled": "false",
    "value.converter.apicurio.registry.as-confluent": "true",
    "value.converter.apicurio.use-id": "contentId",
    "schema.name.adjustment.mode": "avro"
  }
}

четверг, 6 ноября 2025 г. в 15:07:42 UTC+4, Chris Cranford:

Chris Cranford

unread,
Nov 6, 2025, 8:55:49 AM (7 days ago) Nov 6
to debe...@googlegroups.com
Hi -

So as I mentioned, the issue is you have schemas disabled. Please do not disable schemas on the source and sink, and the connector should be able to consume any new events you place in the topic that have schemas.  I'm afraid any existing events that do not have schemas aren't processable.

Thanks,
-cc

Vladislav P

unread,
Nov 6, 2025, 9:12:27 AM (7 days ago) Nov 6
to debezium
Can you tell me, please, which parameter is responsible for disabling the schema?
If we are talking about Avro serialization, then how to fix it?
четверг, 6 ноября 2025 г. в 17:55:49 UTC+4, Chris Cranford:

Chris Cranford

unread,
Nov 6, 2025, 10:47:00 AM (7 days ago) Nov 6
to debe...@googlegroups.com
Hi 

See "key.converter.schemas.enable" and "value.converter.schemas.enable" which you have set to "false". Those should be set to "true", which is generally the default.

Thanks,
-cc

Vladislav P

unread,
Nov 6, 2025, 3:42:36 PM (7 days ago) Nov 6
to debezium

Hi, Chris. I've configured these settings, but it's not working. I've even removed the serialization, but the error still persists, so it's not a serialization issue.

In my test environment, everything works with incremental snapshots. However, when I run the connector in PROD with snapshot.mode: "no_data", it throws this error and fails to migrate data from kafka to postgre. Moreover, this error only appears in 2 tables, while all others are transmitted normally. More precisely, these tables are also transmitted to Postgre, but when the connector reaches this message using __debezium.newkey, it immediately terminates. What should I do?


четверг, 6 ноября 2025 г. в 19:47:00 UTC+4, Chris Cranford:

Vladislav P

unread,
Nov 11, 2025, 3:48:51 AM (3 days ago) Nov 11
to debezium
I would like to clarify. If you run the connector using "snapshot.mode": "no_data". It turns out that this leads to data overflow in an inconsistent state. I think this is where the problems start. How will he solve the problem if he considers the deletion operation from PK:1, if this record has not been read before?

And where does the __debezium.newkey field come from? Does this happen when the PK is changed? When working in "snapshot.mode": "initial", I did not see this.

пятница, 7 ноября 2025 г. в 00:42:36 UTC+4, Vladislav P:

Chris Cranford

unread,
Nov 11, 2025, 10:03:07 PM (2 days ago) Nov 11
to debe...@googlegroups.com
Hi, 

On the JDBC sink side, if we receive a DELETE event for a row that does not exist at the target, it should effectively be treated as a no-op. In SQL, you can safely execute a DELETE on a table even if the row does not exist. 

As for the __debezium.newkey, this header is the product for when a user in the source system has updated a column that participates in the primary key. In topics, the event's key is a crucial part for managing the event's lifecycle. And so when you change a primary key column in a relational database, we need to reflect the fact that the old key in the topic is no longer valid and a new key has emerged after the  update. This is why when you update a row and change it's primary key, you observe the following:

1. A delete event is emitted, using the old primary key combination, where the event contains a header called __debezium.newkey with a value that is a reference to the new primary key column value(s).
2. A create event is emitted, using the new primary key combination, where the event contains a header called __debezium.oldkey what a value that is a reference to the old primary key column value(s).

If you have removed all the key.converter.* and value.converter* settings in the source, did you also make sure to delete the contents in the topic and start fresh before retesting?
It would be helpful if you could share your Kafka Connect worker configuration and your connector configurations, again after your changes.

Thanks,
-cc

Vladislav P

unread,
Nov 12, 2025, 10:54:53 AM (yesterday) Nov 12
to debezium
{
  "name": "source-connector",

  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "{{sourceDatabaseHost}}",
    "database.port": "{{sourceDatabasePort}}",
    "database.user": "{{sourceDatabaseUser}}",
    "database.password": "{{sourceDatabasePassword}}",
    "database.dbname": "{{sourceDatabaseName}}",
    "table.include.list": "SCHEMA1.DEBEZIUM_SIGNAL,...",
    "column.include.list": "SCHEMA1\\.DEBEZIUM_SIGNAL\\.(ID|TYPE|DATA),...",

    "topic.prefix": "{{topicPrefix}}",
    "database.server.name": "{{topicPrefix}}",
    "schema.history.internal.kafka.topic": "dbz_oracle_wpms_history",
    "schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
    "log.mining.strategy": "hybrid",
    "log.mining.query.filter.mode": "in",

    "message.key.columns": "...",

    "signal.enable.channels": "source",
    "signal.data.collection": "{{sourceDatabaseName}}.SCHEMA1.DEBEZIUM_SIGNAL",

    "incremental.snapshot.chunk.size": 50000,
    "incremental.snapshot.allow.schema.changes": "true",

    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "topic.creation.default.cleanup.policy": "delete",

    "snapshot.mode": "no_data",
    "log.mining.transaction.retention.ms": "10800000",
    "schema.history.internal.store.only.captured.tables.ddl": "true",
    "snapshot.database.errors.max.retries": 2,

    "heartbeat.interval.ms": "10000",
    "heartbeat.action.query": "MERGE INTO SCHEMA1.DEBEZIUM_HEARTBEAT t USING (SELECT 1 id, CURRENT_TIMESTAMP ts FROM dual) s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET t.ts = s.ts WHEN NOT MATCHED THEN INSERT (id, ts) VALUES (s.id, s.ts)"
  }
}
{
  "name": "sink-connector",

  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "5",
    "connection.url": "{{targetDbUrl}}",
    "connection.username": "{{targetDbUsername}}",
    "connection.password": "{{targetDbPassword}}",
    "topics.regex": "{{topicPrefix}}\\.SCHEMA1\\.(TABLE1|...)",

    "table.name.format": "${source.schema}.${source.table}",

    "delete.enabled": "true",
    "primary.key.mode": "record_key",
    "primary.key.fields": "ID,LASTDATE",
    "insert.mode": "upsert",

  }
}  
Так выглядит docker-compose: ``` services:
apache-kafka:
image: dc-build-dev-team-core-test-docker.repo.corp.tander.ru/apache/kafka:4.1.0
container_name: apache_kafka
ports:
- "9092:9092"
- "19092:19092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT://apache-kafka:19092
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@apache-kafka:29093
KAFKA_LISTENERS: CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ./volumes/apache-kafka/var/lib/kafka/data:/var/lib/kafka/data
user: root
restart: unless-stopped

kafbat-kafka-ui:
image: dc-build-dev-team-core-test-docker.repo.corp.tander.ru/kafbat/kafka-ui:3616d36
container_name: kafka_ui_kafbat
ports:
- " 8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: apache-kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: apache-kafka:19092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://apicurio:8080/apis/ccompat/v7
depends_on:
- apache-kafka
restart: unless-stopped

debezium-connect:
image: quay-io.repo.corp.tander.ru/debezium/connect:3.4
container_name: debezium_connect
ports:
- " 8083:8083"
environment:
BOOTSTRAP_SERVERS: apache-kafka:19092
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
depends_on:
- apache-kafka
restart: unless-stopped
volumes:
- ./libs/groovy-5.0.1.jar:/kafka/connect/debezium-connector-oracle/groovy-5.0.1.jar
- ./libs/groovy-json-5.0.1.jar:/kafka/connect/debezium-connector-oracle/groovy-json-5.0.1.jar
- ./libs/groovy-jsr223-5.0.1.jar:/kafka/connect/debezium-connector-oracle/groovy-jsr223-5.0.1.jar
- ./libs/debezium-scripting-3.0.8.Final.jar:/kafka/connect/debezium-connector-oracle/debezium-scripting-3.0.8.Final.jar

networks:
default:
name: test-wpms   ``` I have now completely removed the avro serialization section from the source and sink connectors. But the error did not disappear. Absolutely the same thing. Then I decided to delete the section: `` "delete.enabled": "true", "primary.key.mode": "record_key", "primary.key.fields": "ID,LASTDATE", "insert.mode": "upsert", ``` And errors appeared with "Error: duplicate key value violates unique constraint" on all partition tables.

But there are no such problems in the test environment, everything works. But there is no streaming data in the test environment, there is only a backup copy from the database.

среда, 12 ноября 2025 г. в 07:03:07 UTC+4, Chris Cranford:

Chris Cranford

unread,
1:36 AM (15 hours ago) 1:36 AM
to debe...@googlegroups.com
And to be clear, all topics your sink is subscribed to read changes for, they all have ID and LASTDATE columns?

Vladislav P

unread,
2:14 AM (15 hours ago) 2:14 AM
to debezium
Not everyone has
Actually, I have several tables.
For example, her PK "TABLE1" consists of an "ID" column.
There is a "TABLE2", it has a PK, for example, an "ID_T2" column and a "LASTDATE".

Then in "primary.key.fields" I specify "ID,ID_T2, LASTDATE".
четверг, 13 ноября 2025 г. в 10:36:15 UTC+4, Chris Cranford:

Chris Cranford

unread,
2:30 AM (14 hours ago) 2:30 AM
to debe...@googlegroups.com, Vladislav P
You shouldn't need `primary.key.fields` unless you have events coming in with a key that has more columns than what you want as the key in the target when using `record_key`.

Vladislav P

unread,
4:15 AM (13 hours ago) 4:15 AM
to debezium

I've deleted "primary.key.fields" now, but the error still appears.Avro serialization is disabled.

Could this be related to the following:

I have a table in Oracle with a primary key of ID and a standard LASTDATE field.

I'm specifying in the source connector that it should include these fields in the key. "message.key.columns": "SCHEMA1. TABLE1:ID,LASTDATE;".

Because I have partitioning in Postgre, and these columns are primary keys there.

But I wonder why everything works perfectly fine in the test environment with this configuration. In the production environment, I constantly get this error: "No schema defined for value of map field: "__debezium.newkey"". This error is also preceded by a lot of WARNINGs: "Ignored to write record from topic '..' partition '0' offset '...'. No resolvable table name".
четверг, 13 ноября 2025 г. в 11:30:43 UTC+4, Chris Cranford:

Vladislav P

unread,
7:43 AM (9 hours ago) 7:43 AM
to debezium
I'm currently studying the messages in kafka and I see the following: 1 record:
KEY:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "ID_OPART"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "LASTDATE"
}
],
"optional": false,
"name": "dbserver.WMS.TBL_OP_ART.Key"
},
"payload": {
"ID_OPART": 3904018674,
"LASTDATE": 1763047420000
}
}

VALUE:
"payload": {
"before": null,
"after": {
"ID_OPART": 3904018674,
"ID_OP": 244437033,
"ID_ART": 1177410,
"LASTDATE": 1763047420000
},
....
"op": "c",
}

HEADERS:
{
"__debezium.context.connectorName": "oracle",
"__debezium.oldkey": "{\"ID_OPART\":3904018674,\"LASTDATE\":1762957961000}",

"__debezium.context.connectorLogicalName": "dbserver",
"__debezium.context.taskId": "0"
}


2 record:
KEY:
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID_OPART"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"LASTDATE"}],"optional":false,"name":"dbserver.WMS.TBL_OP_ART.Key"},"payload":{"ID_OPART":3904018674,"LASTDATE":1762957961000}}
VALUE: empty
HEADERS:
{"__debezium.context.connectorName":"oracle","__debezium.context.connectorLogicalName":"dbserver","__debezium.context.taskId":"0","__debezium.newkey":"{\"ID_OPART\":3904018674,\"LASTDATE\":1763047420000}"} 3 record: KEY:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "ID_OPART"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "LASTDATE"
}
],
"optional": false,
"name": "dbserver.WMS.TBL_OP_ART.Key"
},
"payload": {
"ID_OPART": 3904018674,
"LASTDATE": 1762957961000
}
}
VALUE:
"payload": {
"before": {
"ID_OPART": 3904018674,
"ID_OP": 244437033,
"ID_ART": 1177410,
"LASTDATE": 1762957961000
},
"after": null,
....,
"op": "d",
}
HEADERS:
{

"__debezium.context.connectorName": "oracle",
"__debezium.context.connectorLogicalName": "dbserver",
"__debezium.context.taskId": "0",
"__debezium.newkey": "{\"ID_OPART\":3904018674,\"LASTDATE\":1763047420000}"
}  

Here you can see that the LASTDATE column has changed. Maybe that's why __debezium.newkey can't map. But how to fix it?
четверг, 13 ноября 2025 г. в 13:15:26 UTC+4, Vladislav P:

jiri.p...@gmail.com

unread,
11:04 AM (6 hours ago) 11:04 AM
to debezium
Hi,

this happens when you update primary key (either real or defined by you).  This is documented in https://debezium.io/documentation/reference/3.3/connectors/oracle.html#oracle-update-events and is working that way to allow compaction of old records in case PK updates.

Jiri

Vladislav P

unread,
11:25 AM (5 hours ago) 11:25 AM
to debezium
Hi, yes, I saw it. But it doesn't help to solve my problem in any way. I am currently using the "debezium/connect" image.:3.3.1.Final" and "apache/kafka:4.1.0" Judging by the logs, the error occurs here - https://github.com/debezium/debezium/blob/5fdb070c4afeab0cfe6b60eb606bc36c887092ae/debezium-sink/src/main/java/io/debezium/bindings/kafka/KafkaDebeziumSinkRecord.java#L412-L422

четверг, 13 ноября 2025 г. в 20:04:27 UTC+4, jiri.p...@gmail.com:
HeaderError.log

jiri.p...@gmail.com

unread,
11:46 AM (5 hours ago) 11:46 AM
to debezium
Could you try to configure

header.converter=org.apache.kafka.connect.json.JsonConverter
header.converter.schemas.enable=true

in Connect worker configuration?

Jiri

Reply all
Reply to author
Forward
0 new messages