oracle to postgresql, struggle with STRUCT

146 views
Skip to first unread message

milist ujang

unread,
Jul 6, 2025, 8:46:21 PMJul 6
to debe...@googlegroups.com
Hi all,

I'm testing debezium 3.1.3. The goal is to replicate data from oracle to postgresql.

oracle:

SQL> desc test1
 Name   Null?    Type
 ----------------------------------------- -------- ----------------------------
 ID   NOT NULL NUMBER
 VALTXT    VARCHAR2(100)
 VALDT    DATE

 ID column is Primary key.


connector config:

        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "decimal.handling.mode": "precise",

INSERT INTO test1 (ID,VALTXT,VALDT) VALUES ('Struct{ID=Struct{scale=0,value=[B@377ed42c}}','AAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAA',1751110504000) ON CONFLICT (ID) DO UPDATE SET VALTXT=EXCLUDED.VALTXT,VALDT=EXCLUDED.VALDT was aborted: ERROR: column "id" is of type numeric but expression is of type character varying

"decimal.handling.mode": "string",

INSERT INTO test1 (ID,VALTXT,VALDT) VALUES ('Struct{ID=1}','AAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAA',1751110504000) ON CONFLICT (ID) DO UPDATE SET VALTXT=EXCLUDED.VALTXT,VALDT=EXCLUDED.VALDT was aborted: ERROR: column "id" is of type numeric but expression is of type character varying

"decimal.handling.mode": "double",
INSERT INTO test1 (ID,VALTXT,VALDT) VALUES ('Struct{ID=1.0}','AAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAAAAAAABBBBBBBBAAA',1751110504000) ON CONFLICT (ID) DO UPDATE SET VALTXT=EXCLUDED.VALTXT,VALDT=EXCLUDED.VALDT was aborted: ERROR: column "id" is of type numeric but expression is of type character varying

Someone in stackoverflow used numbertolong converters at source, but I'm new to this world, how can I find the jar for this library?

        "converters": "numbertolong",
        "numbertolong.type": "org.example.mongodb.kafka.connect.converters.OracleNumberToLong",
        "numbertolong.selector": ".TEST.ID"

{"error_code":500,"message":"Unable to find class org.example.mongodb.kafka.connect.converters.OracleNumberToLong"}


If I change the key converter to JSON, again STRUCT showing in thje log.

        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",

2025-07-07 00:38:52,539 ERROR  ||  WorkerSinkTask{id=jdbc-postgres-sink-test1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Unsupported source data type: STRUCT   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1673)



--
regards

ujang jaenudin | Self-Employed, DBA Consultant
http://id.linkedin.com/pub/ujang-jaenudin/12/64/bab

Chris Cranford

unread,
Jul 7, 2025, 10:14:46 AMJul 7
to debe...@googlegroups.com
Hi -

This is related to DBZ-9166 [1] and and DBZ-9138 [2]. Whenever you define a column as just "NUMBER", this is a shortcut for Oracle to treat the column as "NUMBER(38,0)". Unfortunately 38 digits far exceeds the 64-bit boundary that can be represented by an INT64 data type and therefore Debezium will encode the column's value as a VariableScaleDecimal type that uses a STRUCT to preserve the potential 38-digits. Without doing this and if the field ever has 19+ digits, you will have overflow because the 64-bit data types for numeric types would be insufficient.

Unfortunately I am not familiar with the OracleNumberToLong class you mentioned and I was unable to find it, too.

The one thing I did notice about your output below, even while changing the decimal.handling.mode, it did not seem like you were clearing the topic or you were applying this on the sink side rather than the source.  The `decimal.handling.mode` must be set on the source side so that events are populated in the topic based on whether to use `precise` (the default), `string`, or `double` formats.

Does changing the handling mode on the source and repopulating the topic help?

Thanks,
-cc

[1] https://issues.redhat.com/browse/DBZ-9166
[2] https://issues.redhat.com/browse/DBZ-9138
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/CACG9ogx90UBs_7upyUuym5rsP_4dGr0CDdFOffyW-Os0eBs8OA%40mail.gmail.com.

milist ujang

unread,
Jul 7, 2025, 7:07:43 PMJul 7
to debe...@googlegroups.com
Hi Chris,

Thanks for the information about how debezium handle number data type.
I changed decimal.handling.mode in source; and restart docker of kafka and connect. I didn't give volume to kafka yet until this test passed.

How to overcome the 'Struct{ID=1}' when decimal.handling.mode=string in the sink side; is it any SMT to just pick the number? 


Chris Cranford

unread,
Jul 8, 2025, 12:12:34 PMJul 8
to debe...@googlegroups.com
Hi -

If you adjust the `decimal.handling.mode` on the source to use `string` or `double`, then you won't get a `Struct` on the sink side. You will only get a `Struct` if the decimal handling mode is set to its default of precise. There is no OOTB transformation to deal with converting this to a number on the sink. You would need to write that yourself.

Thanks,
-cc

milist ujang

unread,
Jul 11, 2025, 12:37:25 AMJul 11
to debe...@googlegroups.com
Thanks chris for the response.

Now I'm facing slow at oracle size :
LOGMINER: Memory Size = 15M, HWM 14M, LWM 13M, 90%
LOGMINER: Transaction Queue Size: 1024

How can we increase this logminer ?


Chris Cranford

unread,
Jul 11, 2025, 10:38:48 AMJul 11
to debe...@googlegroups.com
Hi -

I am not aware of anyway these can be increased with ad-hoc mining sessions that Debezium uses, and if you're experiencing latency, these are unlikely the issue. Can you please share your full connector configuration, along with the transactions/second & log switches/hour metrics for the Oracle instance?

Thanks,
-cc

milist ujang

unread,
Jul 15, 2025, 9:01:36 AMJul 15
to debe...@googlegroups.com
Hi Chris,

This is the full source connector config.

{
    "name": "oracle-logminer-connector",  
    "config": {
       "connector.class" : "io.debezium.connector.oracle.OracleConnector",  
       "database.user" : "dbzuser",  
       "database.password" : "pass",  
       "database.dbname" : "dev",  
       "database.hostname" : "dboracle",  
       "database.port" : "1599",  
  "snapshot.mode": "initial",
  "snapshot.max.threads": "8",
  "snapshot.fetch.size": "50000",
  "max.batch.size": "8000",
  "max.queue.size": "16000",
  "query.fetch.size": "5000",
       "topic.prefix" : "testrep",  
  "auto.create.topics.enable": "true",
       "schema.include.list" : "PRD",
       "table.include.list" : "PRD.TEST1,PRD.TEST2",
       "tasks.max" : "1",  
       "internal.log.mining.sql.relaxed.quote.detection" : "true",
       "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
       "schema.history.internal.kafka.topic": "schema-changes.test1dan2",
  "schema.history.internal.store.only.captured.tables.ddl": "true",
  "schema.history.internal.store.only.captured.databases.ddl": "true",
  "log.mining.transaction.retention.ms": "0",
  "log.mining.batch.size.min":     "10000",
  "log.mining.batch.size.default": "50000",
  "log.mining.batch.size.max":    "200000",
  "log.mining.view.fetch.size":     "5000",
  "log.mining.scn.gap.detection.gap.size.min": "10000000",
  "log.mining.scn.gap.detection.time.interval.max.ms": "20000",
       "log.mining.strategy": "online_catalog",
       "log.mining.transaction.retention.ms": "0",
       "log.mining.scn.gap.detection.gap.size.min": "10000000",
       "log.mining.scn.gap.detection.time.interval.max.ms": "20000",
       "heartbeat.interval.ms": "1000",
       "lob.enabled": "true",
       "tombstones.on.delete": "true",
       "errors.max.retries": "-1",
       "snapshot.database.errors.max.retries": "5",
       "decimal.handling.mode": "double",
       "converters": "zero_scale",
       "zero_scale.type": "io.debezium.connector.oracle.converters.NumberToZeroScaleConverter",
       "zero_scale.decimal.mode": "double",
       "converters": "boolean",
       "boolean.type": "io.debezium.connector.oracle.converters.NumberOneToBooleanConverter",
       "boolean.selector": ".*MYTABLE.FLAG,.*.IS_ARCHIVED",
       "converters": "number-to-boolean",
       "number-to-boolean.type": "io.debezium.connector.oracle.converters.NumberOneToBooleanConverter",
       "number-to-boolean.selector": ".*.MY_TABLE.DATA",
       "converters": "number-zero-scale",
       "number-zero-scale.type": "io.debezium.connector.oracle.converters.NumberToZeroScaleConverter",
       "number-zero-scale.decimal.mode": "double",
       "numeric.precision.mapping": "true",
       "numeric.mapping": "best_fit",
       "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
       "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
       "key.converter.apicurio.registry.auto-register": "true",
       "key.converter.apicurio.registry.find-latest": "true",
       "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
       "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
       "value.converter.apicurio.registry.auto-register": "true",
       "value.converter.apicurio.registry.find-latest": "true"
    }
}


Chris Cranford

unread,
Jul 15, 2025, 1:10:38 PMJul 15
to debe...@googlegroups.com
Hi

Nothing in the configuration stands out.

* What are your transactions/second and log switches/hour?

* If you can look at your JMX metrics, can you provide these values
    * LagFromSourceInMilliseconds
    * LastDurationOfFetchQueryInMilliseconds
    * LastBatchProcessingTimeInMilliseconds
    * NumberOfActiveTransactions
    * NumberOfCommittedTransactions
    * NumberOfRollbackTransactions
    * NumberOfPartialRollbackCount
    * TotalProcessedRows

Thanks,
-cc

milist ujang

unread,
Jul 20, 2025, 8:04:17 AMJul 20
to debe...@googlegroups.com
Hi Chris,

I'll check and let you know.

Today I continue test this oracle to postgres replication, got error when split partition on oracle (source):

2025-07-20T11:58:40,723 INFO   ||  WorkerSourceTask{id=oracle-logminer-connector-0} Committing offsets for 10594 acknowledged messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2025-07-20T11:58:40,728 ERROR  ||  WorkerSourceTask{id=oracle-logminer-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
        at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67) ~[debezium-core-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.execute(AbstractLogMinerStreamingChangeEventSource.java:214) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.execute(AbstractLogMinerStreamingChangeEventSource.java:88) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:326) ~[debezium-core-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:207) ~[debezium-core-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:147) ~[debezium-core-3.2.0.Final.jar:3.2.0.Final]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'alter table test1 split partition p_MAXV AT (TO_DATE('01/01/2030', 'DD/MM/YYYY')) INTO (partition p_2029 TABLESPACE users,
partition p_MAXV) online;'
extraneous input 'online' expecting {<EOF>, '/', ';'}
        at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.2.0.Final.jar:3.2.0.Final]
        at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]
        at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]
        at org.antlr.v4.runtime.DefaultErrorStrategy.reportUnwantedToken(DefaultErrorStrategy.java:377) ~[antlr4-runtime-4.10.1.jar:4.10.1]
        at org.antlr.v4.runtime.DefaultErrorStrategy.singleTokenDeletion(DefaultErrorStrategy.java:548) ~[antlr4-runtime-4.10.1.jar:4.10.1]
        at org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:266) ~[antlr4-runtime-4.10.1.jar:4.10.1]
        at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2145) ~[debezium-ddl-parser-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:75) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:73) ~[debezium-ddl-parser-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:70) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:104) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:411) ~[debezium-core-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.dispatchSchemaChangeEventInternal(AbstractLogMinerStreamingChangeEventSource.java:1217) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.buffered.BufferedLogMinerStreamingChangeEventSource.handleSchemaChangeEvent(BufferedLogMinerStreamingChangeEventSource.java:547) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.processEvent(AbstractLogMinerStreamingChangeEventSource.java:473) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.executeAndProcessQuery(AbstractLogMinerStreamingChangeEventSource.java:397) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.buffered.BufferedLogMinerStreamingChangeEventSource.process(BufferedLogMinerStreamingChangeEventSource.java:231) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.buffered.BufferedLogMinerStreamingChangeEventSource.executeLogMiningStreaming(BufferedLogMinerStreamingChangeEventSource.java:157) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource.execute(AbstractLogMinerStreamingChangeEventSource.java:209) ~[debezium-connector-oracle-3.2.0.Final.jar:3.2.0.Final]
        ... 9 more



Chris Cranford

unread,
Jul 20, 2025, 10:37:01 AMJul 20
to debe...@googlegroups.com
Hi, thanks for the report. The DBZ-9238 [1] has been raised and a PR [2] opened.

Thanks,
-cc

[1]: https://issues.redhat.com/browse/DBZ-9238
[2]: https://github.com/debezium/debezium/pull/6578

milist ujang

unread,
Jul 24, 2025, 6:20:27 AMJul 24
to debe...@googlegroups.com
Hi Chris,

Trying to create a scenario add new table TEST3 into replication, when incremental snapshot by inserting into table defined in     "signal.data.collection": "RTSNGDEV.DBZUSER.DEBEZIUM_SIGNAL_TABLE",
but my test3 sink always thrown this error:

2025-07-24T08:59:37,928 ERROR  ||  Failed to process record: Cannot create field because of field name duplication __debezium.context.connectorLogicalName   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]

the steps are:
1. stop + delete source connector 

2. delete topic defined in schema.history.internal.kafka.topic 
kafka-topics --bootstrap-server localhost:9092 --delete --topic schema-changes.test1dan2

3. modify source .json file
"snapshot.mode": "recovery"
"table.include.list" : "DBZUSER.DEBEZIUM_SIGNAL_TABLE,PRD.TEST1,PRD.TEST2,PRD.TEST3",

4. register new sink
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg3.json

5. register source
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @ora.json

am I missing something?

full stack trace:

PRD.TEST3' for the first time. Checking whether topic exists   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
2025-07-24T08:59:37,889 INFO   ||  Topic 'testrep.PRD.TEST3' already exists.   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
2025-07-24T08:59:37,928 ERROR  ||  Failed to process record: Cannot create field because of field name duplication __debezium.context.connectorLogicalName   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __debezium.context.connectorLogicalName
        at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:330) ~[connect-api-4.0.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$7(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:78) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:128) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]

        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2025-07-24T08:59:37,939 ERROR  ||  JDBC sink connector failure   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __debezium.context.connectorLogicalName
        at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:330) ~[connect-api-4.0.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$7(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:78) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:128) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]

        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
2025-07-24T08:59:37,940 ERROR  ||  WorkerSinkTask{id=jdbc-postgres-sink-test3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:121) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]

        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __debezium.context.connectorLogicalName
        at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:330) ~[connect-api-4.0.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$7(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:78) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:128) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        ... 12 more
2025-07-24T08:59:37,940 ERROR  ||  WorkerSinkTask{id=jdbc-postgres-sink-test3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:636) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281) ~[connect-runtime-4.0.0.jar:?]
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238) ~[connect-runtime-4.0.0.jar:?]

        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:121) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
        ... 11 more
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __debezium.context.connectorLogicalName
        at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:330) ~[connect-api-4.0.0.jar:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.lambda$getRecordHeaders$7(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.getRecordHeaders(KafkaDebeziumSinkRecord.java:416) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.bindings.kafka.KafkaDebeziumSinkRecord.<init>(KafkaDebeziumSinkRecord.java:53) ~[debezium-sink-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcKafkaSinkRecord.<init>(JdbcKafkaSinkRecord.java:48) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:78) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:128) ~[debezium-connector-jdbc-3.2.0.Final.jar:3.2.0.Final]
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606) ~[connect-runtime-4.0.0.jar:?]
        ... 11 more



Chris Cranford

unread,
Jul 24, 2025, 9:48:27 AMJul 24
to debe...@googlegroups.com
Hi -

Do you have a transformation that is converting headers (
debezium.context.connectorLogicalName for example) to a field in your event's value? 

Thanks,
-cc

milist ujang

unread,
Jul 24, 2025, 7:20:04 PMJul 24
to debe...@googlegroups.com
Hi Chris,

No....those are SMT at sink:

        "transforms": "ReplaceField,TimestampConverter,Cast,renameTable",
        "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.ReplaceField.blacklist": "__deleted,__table,__source_ts_ms,__op",
        "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
        "transforms.TimestampConverter.target.type": "Timestamp",
        "transforms.TimestampConverter.field": "VALDT",
        "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.Cast.spec": "ID:int64",
        "transforms.renameTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.renameTable.regex": "testrep.RTSNG_PRD.(.*)",
        "transforms.renameTable.replacement": "$1",



Chris Cranford

unread,
Jul 25, 2025, 10:21:57 AMJul 25
to debe...@googlegroups.com
Would it be possible for you to dump one event from one of the topics that you're consuming that fails, the error and the configuration don't seem to align.

Thanks,
-cc

milist ujang

unread,
Aug 1, 2025, 8:28:07 PMAug 1
to debe...@googlegroups.com
Hi Chris,

till now I'm unable to find the correct way to decode from this binary, should I get another serde's jar file?

/kafka_2.13-3.9.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic testrep.RTSNG_PRD.TEST3 \
--property print.key=true --property print.value=true --property print.timestamp=true --property print.headers=true \
--property key.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer \
--property value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer \
--property apicurio.registry.url=http://localhost:8080/apis/registry/v2 --from-beginning

 CreateTime:1754091642157        __debezium.context.connectorLogicalName:testrep,__debezium.context.taskId:0,__debezium.context.connectorName:oracle,apicurio.key.globalId:,apicurio.key.encoding:BINARY,apicurio.value.globalId:,apicurio.value.encoding:BINARY       ▒▒▒bA   ▒▒▒bAAAAA202▒▒▒▒▒f@i@
CCLOBA&202BABEBABEBABEBABE@i@3.2.0.Final
oracletestrep▒▒trueRTSNGDEV▒▒▒▒▒՝▒▒▒ߕ▒▒▒0_PRD
TEST2312631198327r▒쒀▒f▒▒▒▒՝▒▒轾▒▒▒0



Chris Cranford

unread,
Aug 2, 2025, 5:19:50 PMAug 2
to debe...@googlegroups.com
Hi -

Rather than using `--property key.deserializer=...` / `--property value.deserializer=...` / `--property apicurio.registry.url=...`, have you tried using `--consumer-property` rather than `--property`?

The use of `--property` is misleading here, as these are used for formatting the values when writing them to the console, but not when you consume then from the topic. By using the `--consumer-property` toggle, you're going to apply the conversation when reading from the topic.

-cc

On 8/1/25 8:27 PM, milist ujang wrote:
Hi Chris,

till now I'm unable to find the correct way to decode from this binary, should I get another serde's jar file?

/kafka_2.13-3.9.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic testrep.RTSNG_PRD.TEST3 \
--property print.key=true --property print.value=true --property print.timestamp=true --property print.headers=true \
--property key.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer \
--property value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer \
--property apicurio.registry.url=http://localhost:8080/apis/registry/v2 --from-beginning

 CreateTime:1754091642157        __debezium.context.connectorLogicalName:testrep,__debezium.context.taskId:0,__debezium.context.connectorName:oracle,apicurio.key.globalId:,apicurio.key.encoding:BINARY,apicurio.value.globalId:,apicurio.value.encoding:BINARY       ▒▒▒bA   ▒▒▒bAAAAA202▒▒▒▒▒f@i@
Reply all
Reply to author
Forward
0 new messages