First of all many thanks for this very useful project.
correctly managed by Debezium but could be a source of problems in general.
No DDL or PostgreSQL instance issue around the event.
No publication auto creation.
No slot auto drop.
- Same happen if the source connector replicates from primary or from a secondary
It there is not a workaround someone have an example on how to manually flush the slot outside of Debezuim when configuring flush.lsn.source=false?
Log when the source connector found the recorded lsn is not found anymore in the replication slot:
21:34:58 INFO Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{14/C5468F90}, catalogXmin=1638961]
21:34:58 WARN Last recorded offset is no longer available on the server.
21:34:58 INFO The last recorded offset is no longer available but we are in when_needed snapshot mode. Attempting to snapshot data to fill the gap.
21:34:58 INFO Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='connector.source'db='pg_db', lsn=LSN{14/C5468F90}, txId=1638961, messageType=UPDATE, lastCommitLsn=LSN{14/C5468F90}, timestamp=2025-10-14T21:15:26.271652Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{14/C546BA20}, lastCommitLsn=LSN{14/C5468F90}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
At this time the lag between lastCompletelyProcessedLsn=LSN{14/C546BA20} - lastCommitLsn=LSN{14/C5468F90} is 11Kb calculated as:
select pg_size_pretty(pg_wal_lsn_diff('14/C546BA20', '14/C5468F90')) lag;
When the initial snaphost run PostgreSQL log the select, they requires nearly 15min, and then the restart of replication using the same replication slot:
21:49:45 [502757] [Debezium General] pg_user@pg_db LOG: Query: SELECT … FROM "pg_db"."pg_table2"
21:50:43 [502757] [Debezium General] pg_user@pg_db LOG: Query: SELECT … FROM "pg_db"."pg_table1"
21:54:39 [502757] [Debezium General] pg_user@pg_db LOG: Query: SELECT … FROM "pg_db"."pg_table3"
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db LOG: starting logical decoding for slot "pg_slot"
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db DETAIL: Streaming transactions committing after 14/C5468F90, reading WAL from 14/C5468F90.
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db STATEMENT: START_REPLICATION SLOT "pg_slot" LOGICAL 14/C54F7768 ("proto_version" '1', "publication_names" 'pg_slot', "messages" 'true')
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db LOG: logical decoding found consistent point at 14/C5468F90
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db DETAIL: There are no running transactions.
21:54:39 [504370] [Debezium Streaming] pg_user@pg_db STATEMENT: START_REPLICATION SLOT "pg_slot" LOGICAL 14/C54F7768 ("proto_version" '1', "publication_names" 'pg_slot', "messages" 'true')
At this time the lag between the lastCommitLsn at the disconnection time and the position of the initial snapshot is 570Kb calculated as:
select pg_size_pretty(pg_wal_lsn_diff('14/C54F7768', '14/C5468F90')) lag;
The source connector stop, in my expirience is often due apicurio:
21:33:06 INFO [connector.source|task-0] Obtained valid replication slot ReplicationSlot [active=true, latestFlushedLsn=LSN{14/C5468F90}, catalogXmin=1638961] (io.debezium.connector.postgresql.connection.PostgresConnection) [debezium-postgresconnector-connector.source-change-event-source-coordinator]
21:33:06 INFO [connector.source|task-0] 50 records sent during previous 00:24:30.541, last recorded offset of {server=connector.source} partition is {lsn_proc=89209556904, xmin=1638961, messageType=UPDATE, lsn_commit=89209556848, lsn=89209556904, txId=1639003, ts_usec=1760477586173970} (io.debezium.connector.common.BaseSourceTask) [task-thread-connector.source-0]
21:33:06 ERROR [connector.source|task-0] Failed to update cache value for key: {"type":"record","name":"Key","namespace":"….pg_table3","fields":[{"name":"…","type":"string"}],"
connect.name":"….pg_table3.Key"} (io.apicurio.registry.resolver.ERCache) [task-thread-connector.source-0]
io.apicurio.registry.rest.client.exception.RestClientException: HTTP/1.1 header parser received no bytes
…
21:33:06 INFO [connector.source|task-0] Stopping down connector (io.debezium.connector.common.BaseSourceTask) [task-thread-connector.source-0]
21:33:06 INFO [connector.source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection) [pool-11783-thread-1]
21:33:06 INFO [connector.source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection) [pool-11784-thread-1]
21:33:06 INFO [connector.source|task-0] Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator) [debezium-postgresconnector-connector.source-change-event-source-coordinator]
21:33:06 INFO [connector.source|task-0] Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator) [debezium-postgresconnector-connector.source-change-event-source-coordinator]
21:33:06 INFO [connector.source|task-0] SignalProcessor stopped (io.debezium.pipeline.signal.SignalProcessor) [task-thread-connector.source-0]
21:33:06 INFO [connector.source|task-0] Debezium ServiceRegistry stopped. (io.debezium.service.DefaultServiceRegistry) [task-thread-connector.source-0]
21:33:06 INFO [connector.source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection) [pool-11785-thread-1]
21:33:06 INFO [connector.source|task-0] [Producer clientId=connector-producer-connector.source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer) [task-thread-connector.source-0]
…
The source connector restart in less than 2min after the error:
21:34:58 INFO [connector.source|task-0] Kafka version: 3.9.0 (org.apache.kafka.common.utils.AppInfoParser) [DistributedHerder-connect_name:8083-1]
21:34:58 INFO [connector.source|task-0] Kafka commitId: a60e31147e6b01ee (org.apache.kafka.common.utils.AppInfoParser) [DistributedHerder-connect_name:8083-1]
21:34:58 INFO [connector.source|task-0] Kafka startTimeMs: 1760477698102 (org.apache.kafka.common.utils.AppInfoParser) [DistributedHerder-connect_name:8083-1]
21:34:58 INFO [connector.source|task-0] [Producer clientId=connector-producer-connector.source-0] Cluster ID: V57zawTbT5O3pQgJtUoQrA (org.apache.kafka.clients.Metadata) [kafka-producer-network-thread | connector-producer-connector.source-0]
21:34:58 INFO [Worker clientId=connect_name:8083, groupId=connect-cluster] Task 'connector.source-0' restart successful (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect_name:8083-1]
21:34:58 INFO [Worker clientId=connect_name:8083, groupId=connect-cluster] Completed plan to restart 1 of 1 tasks for restart request for {connectorName='connector.source', onlyFailed=true, includeTasks=true} (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect_name:8083-1]
21:34:58 INFO [connector.source|task-0] Starting PostgresConnectorTask with configuration:
snapshot.locking.mode = none
connector.class = io.debezium.connector.postgresql.PostgresConnector
topic.creation.default.partitions = 3
key.converter.apicurio.registry.url =
http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.url =
http://apicurio:8080/apis/registry/v2 slot.name = pg_slot
tasks.max = 1
publication.name = pg_slot
value.converter.apicurio.registry.auto-register = true
topic.creation.bigtables.include = ^.*pg_table1$,^.*pg_table2$
database.sslmode = require
topic.prefix = …
topic.creation.bigtables.cleanup.policy = compact,delete
topic.creation.default.replication.factor = 1
flush.lsn.source = true
value.converter = io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.auto-register = true
key.converter = io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.find-latest = true
publication.autocreate.mode = disabled
database.dbname = pg_db
database.user = pg_user
topic.creation.default.compression.type = zstd
topic.creation.default.cleanup.policy = compact,delete
xmin.fetch.interval.ms = 60000
notification.enabled.channels = log, sink
snapshot.isolation.mode = repeatable_read
database.port = 5432
plugin.name = pgoutput
topic.creation.bigtables.compression.type = zstd
value.converter.apicurio.registry.find-latest = true
notification.sink.topic.name = dbznotification.topic_pfx
topic.creation.groups = bigtables
task.class = io.debezium.connector.postgresql.PostgresConnectorTask
database.hostname = pg_host
topic.creation.bigtables.partitions = 12
database.password = ********
schema.name.adjustment.mode = avro_unicode
name = connector.source
skipped.operations = t
table.include.list = pg_db.pg_table2,pg_db.pg_table1,pg_db.pg_table3
snapshot.mode = when_needed
(io.debezium.connector.common.BaseSourceTask) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] Loading the custom source info struct maker plugin: io.debezium.connector.postgresql.PostgresSourceInfoStructMaker (io.debezium.config.CommonConnectorConfig) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] Loading the custom topic naming strategy plugin: io.debezium.schema.SchemaTopicNamingStrategy (io.debezium.config.CommonConnectorConfig) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection) [pool-11787-thread-1]
Finally the unwanted initial snapshot:
21:34:58 INFO [connector.source|task-0] Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{14/C5468F90}, catalogXmin=1638961] (io.debezium.connector.postgresql.connection.PostgresConnection) [task-thread-connector.source-0]
21:34:58 WARN [connector.source|task-0] Last recorded offset is no longer available on the server. (io.debezium.connector.common.BaseSourceTask) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] The last recorded offset is no longer available but we are in when_needed snapshot mode. Attempting to snapshot data to fill the gap. (io.debezium.connector.common.BaseSourceTask) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='connector.source'db='pg_db', lsn=LSN{14/C5468F90}, txId=1638961, messageType=UPDATE, lastCommitLsn=LSN{14/C5468F90}, timestamp=2025-10-14T21:15:26.271652Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{14/C546BA20}, lastCommitLsn=LSN{14/C5468F90}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] (io.debezium.connector.postgresql.PostgresConnectorTask) [task-thread-connector.source-0]
21:34:58 INFO [connector.source|task-0] user 'pg_user' connected to database 'pg_db' on PostgreSQL 17…
role 'pg_user' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true] (io.debezium.connector.postgresql.PostgresConnectorTask) [task-thread-connector.source-0]