Outbox event router with tombstone events

154 views
Skip to first unread message

Erik Lysne

unread,
Mar 25, 2024, 12:54:30 AM3/25/24
to debezium
Hi all,
I have a question about how to properly send tombstone events using the outbox event router. I'm currently using Debezium 2.5.3 with PostgreSQL and Kafka deployed to Kubernetes through Strimzi.

Here is my connector configuration:

  class: io.debezium.connector.postgresql.PostgresConnector
  config:
    database.dbname: <dbname>
    database.hostname: <hostname>
    database.password: <password>
    database.port: <port>
    database.user: <user>
    plugin.name: pgoutput
    route.tombstone.on.empty.payload: true
    table.include.list: public.Outbox
    transforms: outbox
    transforms.outbox.route.by.field: aggregateType
    transforms.outbox.table.expand.json.payload: true
    transforms.outbox.table.field.event.id: eventId
    transforms.outbox.table.field.event.key: aggregateId
    transforms.outbox.table.field.event.payload: eventPayload
    transforms.outbox.table.fields.additional.placement: eventType:header:type,eventVersion:header:version,eventTimestamp:header:timestamp,eventIssuer:header:issuer,eventUserId:header:userId
    transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
    value.converter.delegate.converter.type: org.apache.kafka.connect.json.JsonConverter


This configuration has been working for a while with regular JSON payloads, but I recently had to make adjustments to support tombstone events. Based on the documentation, it was my understanding that setting route.tombstone.on.empty.payload: true would mean that if a null value is written to the payload column of the outbox table, a tombstone event would be emitted for that key.

However, it seems Debezium is instead trying to expand the payload as a JSON and emits the null value as a string instead (I have verified that the value written to the database is an actual null value, not a "null" string). It is emitting an event with the following payload:

{
"schema": {
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1
},
"payload": "null"
}


And the following warning is logged to Connect:

JSON expansion failed (io.debezium.transforms.outbox.EventRouterDelegate) [task-thread-postgresql-db-kafka-connector-0]
java.lang.Exception: Unable to parse payload starting with 'n'
at io.debezium.transforms.outbox.EventRouterDelegate.parseJsonPayload(EventRouterDelegate.java:307)
at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:162)
at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)


Any help would be greatly appreciated!

Chris Cranford

unread,
Mar 25, 2024, 9:56:06 AM3/25/24
to debe...@googlegroups.com
Hi Erik -

This sounds like a incompatibility between "table.expand.json.payload" and "route.tombstone.on.empty.payload".  Could you please raise a Jira issue if one does not already exist.

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/9a4853b6-751d-4552-93e8-8fc09e07fc5fn%40googlegroups.com.

Erik Lysne

unread,
Apr 11, 2024, 4:21:53 AM4/11/24
to debezium
Hi Chris,

Thanks for the reply! I have tried many variations on the outbox event router configuration, including turning JSON payload expansion off. However, I don't see any way to create a tombstone event through Debezium with the outbox event router. I have also tried disabling schemas, so the payload is not wrapped by the Avro envelope. Here is my current (relevant) configuration:


transforms.outbox.table.expand.json.payload: false
transforms.outbox.table.json.payload.null.behavior: ignore
route.tombstone.on.empty.payload: true
value.converter: io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type: org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable: false
key.converter: io.debezium.converters.BinaryDataConverter
key.converter.delegate.converter.type: org.apache.kafka.connect.json.JsonConverter
key.converter.delegate.converter.type.schemas.enable: false


With variations on these settings, I only end up with a 4-byte "null" value or a 2-byte empty JSON ("{}") value which are not proper tombstone events.

Is there something wrong with my configuration or does route.tombstone.on.empty.payload just not work properly with the outbox event router? If so, I can raise a Jira issue.

Thanks!

Chris Cranford

unread,
Apr 11, 2024, 9:44:57 AM4/11/24
to debe...@googlegroups.com
Hi Erik -

I believe the issue is that you did not prefix the "route.tombstone.on.empty.payload" with the "transforms.outbox" prefix like the other outbox-specific configuration properties.  If you properly prefix the property, it should work as you expect.

Thanks,
Chris

Erik Lysne

unread,
Apr 11, 2024, 1:27:44 PM4/11/24
to debezium
Hi Chris,

That was indeed the issue. Thank you soo much, you're a hero! 🙏🏻
Reply all
Reply to author
Forward
0 new messages