PostgreSQL 3.1.2 Connector does not flush LSN and wal logs are not cleaned

111 views
Skip to first unread message

Vladyslav Lysenko

unread,
Sep 17, 2025, 6:10:19 AMSep 17
to debezium
Hello!
I use this docker image
FROM redhat/ubi8-minimal AS builder

RUN microdnf update && \
microdnf install jq gettext unzip curl && \
microdnf clean all

FROM confluentinc/cp-kafka-connect-base:7.7.4

ENV POSTGRESQL_CONNECTOR_VERSION=3.1.2
ENV CONFLUENT_HUB_DIR=/usr/share/confluent-hub-components

COPY --from=builder /usr/lib64/libjq.so.1 /usr/lib64/libjq.so.1
COPY --from=builder /usr/lib64/libonig.so.5 /usr/lib64/libonig.so.5
COPY --from=builder /usr/bin/jq /usr/bin/jq
COPY --from=builder /usr/bin/envsubst /usr/bin/envsubst
COPY --from=builder /usr/bin/curl /usr/bin/curl
COPY --from=builder /usr/bin/unzip /usr/bin/unzip

RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:${POSTGRESQL_CONNECTOR_VERSION} --component-dir ${CONFLUENT_HUB_DIR}

COPY docker/kafka-connect/probe.py /home/appuser/

and this connector config
{
"name": "postgres-payment-service",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "platform-payment-service-postgres",
"database.port": "5432",
"database.dbname" : "platform_payment_service",
"database.user": "root",
"database.password": "root",
"topic.prefix" : "local",
"table.include.list": "public.debezium_heartbeat, public.outbox_m[0-9]+",
"publication.autocreate.mode": "filtered",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"tombstones.on.delete": "false",
"poll.interval.ms": "100",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"route.tombstone.on.empty.payload": "false",
"table.op.invalid.behavior": "warn",
"skipped.operations": "u,d",
"snapshot.mode": "initial",
"slot.failover": true,
"heartbeat.action.query": "update debezium_heartbeat set last_heartbeat_ts = now();",
"heartbeat.interval.ms": "3000",
"flush.lsn.source": "true",

"predicates": "allowOutboxTable,allowedTopics",
"predicates.allowOutboxTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.allowOutboxTable.pattern": "local.public.outbox_m[0-9]+",
"predicates.allowedTopics.pattern": "payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",
"predicates.allowedTopics.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",

"transforms": "filterOutboxTable,transformOutboxEvent,filterOutboxTopic",
"transforms.filterOutboxTable.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterOutboxTable.negate": "true",
"transforms.filterOutboxTable.predicate": "allowOutboxTable",
"transforms.filterOutboxTopic.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterOutboxTopic.negate": "true",
"transforms.filterOutboxTopic.predicate": "allowedTopics",
"transforms.transformOutboxEvent.route.topic.replacement": "${routedByValue}",
"transforms.transformOutboxEvent.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.transformOutboxEvent.route.by.field": "topic",
"transforms.transformOutboxEvent.table.field.event.key": "partition_key",
"transforms.transformOutboxEvent.table.field.event.id": "uuid",
"transforms.transformOutboxEvent.table.field.event.payload": "payload",
"transforms.transformOutboxEvent.table.field.event.timestamp": "created_at",
"transforms.transformOutboxEvent.table.expand.json.payload": "false",
"transforms.transformOutboxEvent.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version"
}

Connector reads from outbox db table and publishes to different kafka topics. Also debezium_heartbeat is configured in order to flush lsn and clear wal logs when there are no messages in the outbox table.

Problem:
When I have in the docker image connector version POSTGRESQL_CONNECTOR_VERSION=2.5.4 - debezium_heartbeat table is updated and wal log cleaned.
When I have in the image connector version ENV POSTGRESQL_CONNECTOR_VERSION=3.1.2 - debezium_heartbeat table is updated and wal log is NOT cleaned.
In both cases publication contains debezium_heartbeat table.

What can be the problem with the newer connector version??? Same connector config works with the 2.5 but does not with 3.1. How to make newer version also clear wal logs? Thanks.

jiri.p...@gmail.com

unread,
Sep 18, 2025, 2:12:19 AM (14 days ago) Sep 18
to debezium
Hi,

do you see update events recorded in a topic associated with debezium_heartbeat table?

Jiri

Vladyslav Lysenko

unread,
Sep 18, 2025, 3:59:31 AM (14 days ago) Sep 18
to debezium
Hello. Yes, debezium_heartbeat table is updated

четвер, 18 вересня 2025 р. о 09:12:19 UTC+3 jiri.p...@gmail.com пише:

Vladyslav Lysenko

unread,
Sep 19, 2025, 10:30:09 AM (13 days ago) Sep 19
to debezium

I've improved transformers config:
"predicates": "isOutbox,isAllowedTopic",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",

"transforms": "route,filterTopics",

"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",

"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true"

Now I expect: 
transformer "route" - if event is from outbox, this transformer is applied and event is moved to the next stage. if event is from debezium_heartbeat, event is moved to the next stage.
transformer "filterTopics" - keeps only events which have "topic" value from the list "predicate.isAllowedTopic.pattern".

And I get the same behaviour:
When I have in the docker image connector version POSTGRESQL_CONNECTOR_VERSION=2.5.4 - debezium_heartbeat table is updated and wal log cleaned.
When I have in the image connector version ENV POSTGRESQL_CONNECTOR_VERSION=3.1.2 - debezium_heartbeat table is updated and wal log is NOT cleaned.

What could be the reason?

I've asked Gemini and it says: "Starting with Debezium `2.6`, this behavior changed. If SMTs filter out *all* messages in a batch, the connector does not commit the offset, and consequently, the LSN is not flushed to the database." - in connector release notes and documentation I can't find approval for this statement.
Also for now there is no solution.


четвер, 18 вересня 2025 р. о 10:59:31 UTC+3 Vladyslav Lysenko пише:

Vladyslav Lysenko

unread,
Sep 19, 2025, 6:48:27 PM (12 days ago) Sep 19
to debezium
Do I need a kafka topic for the debezium_heartbeat table? 

What I have now:

For both 3.1 and 2.5 this config works for wal cleanup via heartbeat - in this case heartbeat events are filtered, there is no topic for this table. (this config does not work for me because it does not contain outbox kafka  topics filtering):
"predicates": "isOutbox,isHeartbeat",

"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "local\\.public\\.debezium_heartbeat",

"transforms": "route,filterHeartbeat",


"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",

"transforms.filterHeartbeat.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterHeartbeat.predicate": "isHeartbeat" 


Also this config works  for wal cleanup via heartbeat for both 3.1 and 2.5  - but in this case there is a "debezium_heartbeat" topic with events in it:
"predicates": "isOutbox,isAllowedTopic",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "local\\.public\\.debezium_heartbeat|payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",


"transforms": "route,filterTopics",

"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",

"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true"


But this config does not work for 3.1 - wal is not cleaned up (only added heatbeat filter to the previous example):
"predicates": "isOutbox,isAllowedTopic,isHeartbeat",

"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "local\\.public\\.debezium_heartbeat|payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "local\\.public\\.debezium_heartbeat",

"transforms": "route,filterTopics,filterHeartbeat",


"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",

"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true",

"transforms.filterHeartbeat.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterHeartbeat.predicate": "isHeartbeat"



Ideal config from my previous message that works for 2.5 but does not work for 3.1(wal is not cleaned up using heartbeats) is this one:
"predicates": "isOutbox,isAllowedTopic",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",

"transforms": "route,filterTopics",

"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",

"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true"

четвер, 18 вересня 2025 р. о 09:12:19 UTC+3 jiri.p...@gmail.com пише:
Hi,

Chris Cranford

unread,
Sep 20, 2025, 8:51:01 PM (11 days ago) Sep 20
to debe...@googlegroups.com
Hi, thanks for the detailed response. I believe I understand the issue.

When a `SourceRecord` goes through the transformation chain [1], the `producerRecord` is `null` and therefore `AbstractWorkerSourceTask` calls `recordDropped`. This ultimately calls `SourceTask#commitRecord`, which for Debezium is a no-op. When the `producerRecord` is not `null`, the `prepareToSendRecord` [2] method is called. This eventually calls `SourceTask#commit`, and this is where Debezium synchronizes the offset state with the replication slot.

So if you have a period where all you have are heartbeat events and they're being filtered and not sent to Kafka, then the replication slot's confirmed LSN won't be updated, which explains your WAL growth. The reason you observe a difference in behavior appears to be related to a change in DBZ-7816 [3].

For now, as a workaround, simply don't filter the heartbeat event and let it go the Kafka topic.


I have also logged DBZ-9490 [4] so we don't loose sight of this.

Thanks,
-cc

[1]: https://github.com/apache/kafka/blob/d6133f6997e0de3931f3d98b30950aeeefd23e70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L415-L416
[2]: https://github.com/apache/kafka/blob/d6133f6997e0de3931f3d98b30950aeeefd23e70/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L425
[3]: https://issues.redhat.com/browse/DBZ-7816
[4]: https://issues.redhat.com/browse/DBZ-9490
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/440a7673-d8ad-49a7-9188-e0e978f1fc7bn%40googlegroups.com.

Vladyslav Lysenko

unread,
Sep 22, 2025, 11:05:17 AM (10 days ago) Sep 22
to debezium

Thanks. 
I've left kafka topic for heartbeat events for now.
неділя, 21 вересня 2025 р. о 03:51:01 UTC+3 Chris Cranford пише:
Reply all
Reply to author
Forward
0 new messages