Hi all,
I have a question about how to properly send tombstone events using the outbox event router. I'm currently using Debezium 2.5.3 with PostgreSQL and Kafka deployed to Kubernetes through Strimzi.
Here is my connector configuration:
class: io.debezium.connector.postgresql.PostgresConnector
config:
database.dbname: <dbname>
database.hostname: <hostname>
database.password: <password>
database.port: <port>
database.user: <user>
plugin.name: pgoutput
route.tombstone.on.empty.payload: true
table.include.list: public.Outbox
transforms: outbox
transforms.outbox.route.by.field: aggregateType
transforms.outbox.table.expand.json.payload: true
transforms.outbox.table.field.event.id: eventId
transforms.outbox.table.field.event.key: aggregateId
transforms.outbox.table.field.event.payload: eventPayload
transforms.outbox.table.fields.additional.placement: eventType:header:type,eventVersion:header:version,eventTimestamp:header:timestamp,eventIssuer:header:issuer,eventUserId:header:userId
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
value.converter.delegate.converter.type: org.apache.kafka.connect.json.JsonConverter
This configuration has been working for a while with regular JSON payloads, but I recently had to make adjustments to support tombstone events. Based on the documentation, it was my understanding that setting route.tombstone.on.empty.payload: true would mean that if a null value is written to the payload column of the outbox table, a tombstone event would be emitted for that key.
However, it seems Debezium is instead trying to expand the payload as a JSON and emits the null value as a string instead (I have verified that the value written to the database is an actual null value, not a "null" string). It is emitting an event with the following payload:
{
"schema": {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1
},
"payload": "null"
}
And the following warning is logged to Connect:
JSON expansion failed (io.debezium.transforms.outbox.EventRouterDelegate) [task-thread-postgresql-db-kafka-connector-0]
java.lang.Exception: Unable to parse payload starting with 'n'
at io.debezium.transforms.outbox.EventRouterDelegate.parseJsonPayload(EventRouterDelegate.java:307)
at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:162)
at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Any help would be greatly appreciated!