Do I need a kafka topic for the debezium_heartbeat table?
What I have now:
For both 3.1 and 2.5 this config works for wal cleanup via heartbeat - in this case heartbeat events are filtered, there is no topic for this table. (this config does not work for me because it does not contain outbox kafka topics filtering):
"predicates": "isOutbox,isHeartbeat",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "local\\.public\\.debezium_heartbeat",
"transforms": "route,filterHeartbeat",
"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",
"transforms.filterHeartbeat.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterHeartbeat.predicate": "isHeartbeat"
Also this config works for wal cleanup via heartbeat for both 3.1 and 2.5 - but in this case there is a "debezium_heartbeat" topic with events in it:
"predicates": "isOutbox,isAllowedTopic",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "local\\.public\\.debezium_heartbeat|payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",
"transforms": "route,filterTopics",
"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",
"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true"
But this config does not work for 3.1 - wal is not cleaned up (only added heatbeat filter to the previous example):
"predicates": "isOutbox,isAllowedTopic,isHeartbeat",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "local\\.public\\.debezium_heartbeat|payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "local\\.public\\.debezium_heartbeat",
"transforms": "route,filterTopics,filterHeartbeat",
"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",
"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true",
"transforms.filterHeartbeat.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterHeartbeat.predicate": "isHeartbeat"
Ideal config from my previous message that works for 2.5 but does not work for 3.1(wal is not cleaned up using heartbeats) is this one:
"predicates": "isOutbox,isAllowedTopic",
"predicates.isOutbox.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOutbox.pattern": "local\\.public\\.outbox_m[0-9]+",
"predicates.isAllowedTopic.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isAllowedTopic.pattern": "payment_customer|payment_order|payment_customer_subscription|payment_invoice|solid_order|solid_subscription|payment_customer_subscription_command|payment_customer_subscription_command_payment_service_dl",
"transforms": "route,filterTopics",
"transforms.route.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.route.predicate": "isOutbox",
"transforms.route.route.by.field": "topic",
"transforms.route.route.topic.replacement": "${routedByValue}",
"transforms.route.table.field.event.key": "partition_key",
"transforms.route.table.field.event.id": "uuid",
"transforms.route.table.field.event.payload": "payload",
"transforms.route.table.field.event.timestamp": "created_at",
"transforms.route.table.expand.json.payload": "false",
"transforms.route.table.fields.additional.placement": "uuid:header:id,request_id:header:request_id,created_at:header:timestamp_ms,subject:header:subject,schema_version:header:schema-version",
"transforms.filterTopics.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterTopics.predicate": "isAllowedTopic",
"transforms.filterTopics.negate": "true"