Issue with Debezium PostgreSQL Connector After Adding New Column

202 views
Skip to first unread message

Amit Kumar Manjhi

unread,
May 29, 2024, 6:34:09 AM5/29/24
to debezium

Hi All,

I have created a Debezium PostgreSQL Source Connector for a table X with columns id, name, and address.

When I initially created the source connector, I could see the data from table X in Kafka. 

However, after adding a new column 'age' to table X, I noticed that the 'age' column data is not appearing in Kafka. Additionally, my source connector stopped working and the replication slot size started increasing.

My question is: How can I ensure that the newly added column data is visible in Kafka?

What changes need to be made to accommodate new columns?

Thanks,
-Amit

jiri.p...@gmail.com

unread,
May 29, 2024, 8:42:05 AM5/29/24
to debezium
Hi,

could you please share you connector log?

Jiri

Amit Kumar Manjhi

unread,
May 29, 2024, 9:04:43 AM5/29/24
to debezium
Hi Jiri

Here is logs


Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:479)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:392)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:359)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic table_X : Schema being registered is incompatible with an earlier schema for subject \"table_X-key\", details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'age' at path '/fields/0' in the new schema has no default value and is missing in the old schema', additionalInfo:'age'}, {oldSchemaVersion: 1}, {oldSchema: '{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"table_X\",\"fields\":[{\"name\":\"document_age\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.data.Uuage\"}},{\"name\":\"__dbz__physicalTableageentifier\",\"type\":\"string\"}],\"connect.name\":\"table_X.Key\"}'}, {valageateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409\n\tat io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:98)\n\tat org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$5(AbstractWorkerSourceTask.java:479)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)\n\t... 13 more\n"}],"type":"source"}

jiri.p...@gmail.com

unread,
Jun 3, 2024, 7:22:07 AM6/3/24
to debezium
Hi,

the issue is that the change in the schema is incompatiblem form Schema Regitry PoV. You need to change the compatibility enforcement rules in schema registry to get this running.

Jiri

Reply all
Reply to author
Forward
0 new messages