[debezium] Postgres Debezium connection issues - socket exceptions?

90 views
Skip to first unread message

Mikhail Spirin

unread,
Sep 12, 2025, 1:49:04 PMSep 12
to debe...@googlegroups.com
Hi everyone!!!

I'm having strange errors while running postgres debezium connector. At first i found smth similar when i tyried to execute blocking snapshot, now i see it also on regular connectors.
Versions:
Debezium 3.1.2
Postgres - 17.5

First started when i tried to setup full resnapshot of table. Note that initial snapshot works fine, regular connector streaming is finem even small blocking snapshots are fine (like 300-500 messages in table). For 0.5M messages, snapshot fails with the following:

September 8,2025 at 14:45 Caused by:
org.postgresql.util.PSQLException: Database connection failed when writing to copy 5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java: 1189)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java: 33)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java: 200)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(V3PGReplicationStream.java: 118)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(PostgresReplicationConnection.java: 819)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(PostgresReplicationConnection.java: 811)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$commitOffset$2(PostgresStreamingChangeEventSource.java: 464)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45 ... 4 more 5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
Caused by: java.net.SocketException: Broken pipe 5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java: 425)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java: 445)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java: 831)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java: 1045)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java: 345)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java: 1308)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.util.internal.PgBufferedOutputStream.flushBuffer(PgBufferedOutputStream.java: 41)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.util.internal.PgBufferedOutputStream.flush(PgBufferedOutputStream.java: 48)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.PGStream.flush(PGStream.java: 708)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45
at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java: 1187)5e56f57b55e546bfb50fe50bf44b8e36 postgresdebezium-docker September 8,2025 at 14:45 ... 10 more

Now i can also experience it on regular connectors; first they fail with this:

September 12,2025,
15:47 [2025-09-12 13:47:33,073]
ERROR [|task-0]
Unexpected exception while performing keepalive status update on the replication stream(
  io.debezium.connector.postgresql.connection.PostgresReplicationConnection: 842
)
fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47 java.lang.InterruptedException: sleep interrupted fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,15:47
at java.base/java.lang.Thread.sleep(Native Method)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at io.debezium.util.Metronome$1.pause(Metronome.java: 57)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.lambda$startKeepAlive$0(PostgresReplicationConnection.java: 837)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 539)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java: 264)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1136)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 635)fa6c420208d44c9cbd0bf5adfb6454cb postgresdebezium-docker September 12,2025,
15:47
at java.base/java.lang.Thread.run(Thread.java: 840)

and then connector stops streaming with

[2025-09-12 13:47:47,479] INFO [|task-0] [Consumer clientId=d307e02a-c15a-49fa-a0bb-aa4acc3b3372, groupId=kafka-signal] Disconnecting from node -1 due to socket connection setup timeout. The timeout value is 10039 ms. (org.apache.kafka.clients.NetworkClient:874)

This is connector setup:


{
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "custom.metric.tags": "table=dummyschema.dummytable",
                "database.dbname": "postgres",
                "database.hostname": "...",
                "database.password": "...",
                "database.port": "5432",
                "database.user": "debezium",
                "heartbeat.action.query": "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)",
                "heartbeat.interval.ms": "300000",
                "key.converter.schemas.enable": "false",
                "key.converter": "org.apache.kafka.connect.json.JsonConverter",
                "name": "postgres-debezium-connector",
                "plugin.name": "pgoutput",
                "publication.name": "cdc",
                "schemas.enable": "false"
                "signal.consumer.sasl.jaas.config": "...;",
                "signal.consumer.sasl.mechanism": "...",
                "signal.consumer.security.protocol": "...",
                "signal.consumer.ssl.endpoint.identification.algorithm": "https",
                "signal.data.collection": "debezium.dummy_table_signals",
                "signal.enabled.channels": "kafka",
                "signal.kafka.bootstrap.servers": "...",
                "signal.kafka.topic": "dummy-signal",
                "slot.failover": "true",
                "slot.name": "dummy_slot_name",
                "snapshot.delay.ms": "30000",
                "snapshot.lock.timeout.ms": "600000",
                "snapshot.mode": "when_needed",
                "streaming.delay.ms": "10000",
                "table.include.list": "dummyschema.dummytable",
                "topic.creation.default.partitions": "1",
                "topic.creation.default.replication.factor": "3",
                "topic.creation.default.retention.ms": "604800000",
                "topic.creation.enable": "true",
                "topic.prefix": "dummy_prefix",
                "value.converter.schemas.enable": "false",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter", 
}

Any ideas what could i check? I feel like postgres connectivity issues may be, but which seetings should i check? Mostly postgres timeouts are set to 0, things like max_wal_senders and max_replication_slots are set to valid numbers.
Need to note that there is a pg_bouncer set up between debezium and postgres, but it doesnt seem to be affecting the situation. Server side also doesnt show any meaningful errors...


--
Kind regards,
Mikhail Spirin

Data Platform Engineer at DeviantArt

Chris Cranford

unread,
Sep 12, 2025, 1:56:56 PMSep 12
to debe...@googlegroups.com
Hi -

 A great contributor published a blog [1] about their experiences on AWS as an example, where they found that PgBouncer was the source on connection issues and removing it from the picture resolves those issues. So I would suggest temporarily connecting directly to the PostgreSQL instance and bypass PgBouncer and see if that resolves your issues.

Thanks,
-cc

[1]: https://debezium.io/blog/2020/02/25/lessons-learned-running-debezium-with-postgresql-on-rds/
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/CAALZtq8%3DpyQVUaO2Ve%3D9_uNC0N5YkYXV_K4KcBpnPafn5PdZWw%40mail.gmail.com.

Mikhail Spirin

unread,
Sep 19, 2025, 6:27:04 PM (12 days ago) Sep 19
to debe...@googlegroups.com
Thanks a lot for that article, Chris! We are planning to try that, not easy and won't suit our prod but, for the sake of a test - will try.

Meanwhile, I’m not sure now if this is a source of a problem...


Right now, when debezium is in when_needed mode, the process is following - i send snapshot signal, it goes all steps up to "Snapshot step 7 - Snapshotting data",
Starts snapshotting, gets say 1-2k messages and then fails with an exception like 

"org.postgresql.util.PSQLException: Database connection failed when writing to copy"

But, at the same time, if I start the connector in initial or like always mode, it actually performs a snapshot without any issues, without any problems on exactly the same set of config settings.

Any idea what I can check more? I guess lately some snapshot part was worked on, may it actually be a bug in debezium?...

Chris Cranford

unread,
Sep 20, 2025, 9:05:25 PM (11 days ago) Sep 20
to debe...@googlegroups.com
Hi -

The `snapshot.mode` itself only influences the criteria used at the bootstrap of the connector to determine if the snapshot should be executed or skipped. Once the snapshot begins, the mode itself doesn't have any influence. So it's unlikely that the snapshot mode itself directly influences the issue. What I would suggest is to look more closely at the "Caused by" portions of the stack trace to understand what triggered the write to copy failure. If its connection related issues, then it's highly probable its related to PgBouncer or some network connectivity issue instead.

Is the stack trace below the exact same stack trace you get during the `when_needed` failure? If its different, could you share, please.

Thanks,
-cc

Mikhail Spirin

unread,
Sep 22, 2025, 6:43:43 PM (9 days ago) Sep 22
to debe...@googlegroups.com
Is the stack trace below the exact same stack trace you get during the `when_needed` failure? If its different, could you share, please.

Not sure what you're referring to? I have the same set of errors each time.

What makes me think that it is not a connection issue (or I don't understand how it can be) - in initial and always modes I can 100% run the snapshot when the connector is started. I don’t change anything in the connector configs, same networking, and it does run for several hours without any issues. There were no exceptions to this at this moment.
If this was a pg bouncer problem, shouldn't it fail the same way?

Chris Cranford

unread,
Sep 22, 2025, 11:26:12 PM (9 days ago) Sep 22
to debe...@googlegroups.com
Hi -

It should.

I took another pass at your error's stack trace below, and I wonder would it be possible for you to share the full, complete connector logs using the `when_needed` when it fails, at least with INFO level logging, but if you can enable DEBUG logging for `io.debezium`, that would be even better. I wonder if there is the potential for a corner case bug that only renders itself in `when_needed`, and the logs would be extremely helpful.

Thanks,
-cc

Mikhail Spirin

unread,
Sep 23, 2025, 7:45:56 PM (8 days ago) Sep 23
to debe...@googlegroups.com
Yep, i've prepared sanitized full log for you, it is started with requesting of the blocking snapshot. With DEBUG level

[2025-09-23 23:17:44,262] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Committing offset '{server=mock_canonicalname}' for partition '{incremental_snapshot_correlation_id=null, lsn_proc=1149273703696, messageType=MESSAGE, lsn_commit=1149273703696, lsn=1149273703696, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000017372000e6a6176612e7574696c2e55554944bc9903f7986d852f0200024a000c6c65617374536967426974734a000b6d6f73745369674269747378708e4c5a5df006d314ffffcbd80a8094e3, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null}], ts_usec=1758662929952864, incremental_snapshot_primary_key=aced000570}' (io.debezium.connector.postgresql.PostgresConnectorTask:420)
[2025-09-23 23:17:44,263] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Received offset commit request on commit LSN 'LSN{10B/960D0510}' and change LSN 'LSN{10B/960D0510}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:449)
[2025-09-23 23:17:44,264] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Flushing LSN to server: LSN{10B/960D0510} (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:457)
[2025-09-23 23:17:44,264] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component PostgresStreamingChangeEventSource, id = mock_canonicalname named = lsn-flush (io.debezium.util.Threads:273)
[2025-09-23 23:17:44,264] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-postgresstreamingchangeeventsource-mock_canonicalname-lsn-flush (io.debezium.util.Threads:290)
[2025-09-23 23:17:49,280] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Committing offset '{server=mock_canonicalname}' for partition '{incremental_snapshot_correlation_id=null, lsn_proc=1149273703696, messageType=MESSAGE, lsn_commit=1149273703696, lsn=1149273703696, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000017372000e6a6176612e7574696c2e55554944bc9903f7986d852f0200024a000c6c65617374536967426974734a000b6d6f73745369674269747378708e4c5a5df006d314ffffcbd80a8094e3, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null}], ts_usec=1758662929952864, incremental_snapshot_primary_key=aced000570}' (io.debezium.connector.postgresql.PostgresConnectorTask:420)
[2025-09-23 23:17:49,280] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Received offset commit request on commit LSN 'LSN{10B/960D0510}' and change LSN 'LSN{10B/960D0510}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:449)
[2025-09-23 23:17:49,280] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Flushing LSN to server: LSN{10B/960D0510} (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:457)
[2025-09-23 23:17:49,280] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component PostgresStreamingChangeEventSource, id = mock_canonicalname named = lsn-flush (io.debezium.util.Threads:273)
[2025-09-23 23:17:49,280] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-postgresstreamingchangeeventsource-mock_canonicalname-lsn-flush (io.debezium.util.Threads:290)
[2025-09-23 23:17:50,364] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested 'BLOCKING' snapshot of data collections '[mock_collectionid]' with additional conditions '[]' and surrogate key 'PK of table will be used' (io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot:64)
[2025-09-23 23:17:50,763] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Streaming will now pause (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:268)
[2025-09-23 23:17:50,763] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Starting snapshot (io.debezium.pipeline.ChangeEventSourceCoordinator:251)
[2025-09-23 23:17:50,763] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] The connector will wait for 3s before proceeding (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:178)
[2025-09-23 23:17:54,294] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Committing offset '{server=mock_canonicalname}' for partition '{incremental_snapshot_correlation_id=null, lsn_proc=1149273703696, messageType=MESSAGE, lsn_commit=1149273703696, lsn=1149273703696, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000017372000e6a6176612e7574696c2e55554944bc9903f7986d852f0200024a000c6c65617374536967426974734a000b6d6f73745369674269747378708e4c5a5df006d314ffffcbd80a8094e3, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"mock_collectionid","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null}], ts_usec=1758662929952864, incremental_snapshot_primary_key=aced000570}' (io.debezium.connector.postgresql.PostgresConnectorTask:420)
[2025-09-23 23:17:54,294] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Received offset commit request on commit LSN 'LSN{10B/960D0510}' and change LSN 'LSN{10B/960D0510}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:449)
[2025-09-23 23:17:54,294] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Flushing LSN to server: LSN{10B/960D0510} (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:457)
[2025-09-23 23:17:54,294] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component PostgresStreamingChangeEventSource, id = mock_canonicalname named = lsn-flush (io.debezium.util.Threads:273)
[2025-09-23 23:17:54,294] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-postgresstreamingchangeeventsource-mock_canonicalname-lsn-flush (io.debezium.util.Threads:290)
[2025-09-23 23:17:55,763] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot step 1 - Preparing (io.debezium.relational.RelationalSnapshotChangeEventSource:135)
[2025-09-23 23:17:55,766] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot step 2 - Determining captured tables (io.debezium.relational.RelationalSnapshotChangeEventSource:144)
[2025-09-23 23:17:55,768] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Created connection pool with 1 threads (io.debezium.relational.RelationalSnapshotChangeEventSource:240)
[2025-09-23 23:17:55,768] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot step 3 - Locking captured tables [mock_collectionid] (io.debezium.relational.RelationalSnapshotChangeEventSource:153)
[2025-09-23 23:17:55,768] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot step 4 - Determining snapshot offset (io.debezium.relational.RelationalSnapshotChangeEventSource:159)
[2025-09-23 23:17:55,769] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot step 5 - Reading structure of captured tables (io.debezium.relational.RelationalSnapshotChangeEventSource:166)
[2025-09-23 23:17:55,769] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Reading structure of schema 'mock_schema' of catalog 'postgres' (io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource:222)
[2025-09-23 23:17:55,784] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] Error during snapshot (io.debezium.relational.RelationalSnapshotChangeEventSource:197)
java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=mock_canonicalname, connectorName=postgresql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.postgresql.PostgresConnector, signal.consumer.sasl.mechanism=PLAIN, topic.creation.default.partitions=1, streaming.delay.ms=3000, custom.metric.tags=table=mock_collectionid, slot.name=slot_postgres_debezium_source_dev_mock_schema_table, publication.name=mock_publicationname, signal.enabled.channels=kafka, snapshot.delay.ms=3000, topic.prefix=mock_canonicalname, heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), topic.creation.default.replication.factor=3, signal.data.collection=debezium.mock_schema_table_signals, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=mock_user, database.dbname=postgres, signal.kafka.bootstrap.servers=..., signal.kafka.topic=mock_signal--postgres-debezium-source-dev-mock_connectorname-signal, signal.consumer.ssl.endpoint.identification.algorithm=https, heartbeat.interval.ms=120000, topic.creation.default.retention.ms=604800000, plugin.name=pgoutput, database.port=mockport, ... ...;, topic.creation.enable=true, key.converter.schemas.enable=false, slot.failover=true, task.class=io.debezium.connector.postgresql.PostgresConnectorTask, database.hostname=mock_postgresdatabasename, database.password=..., name=postgres-debezium-source-dev-mock_connectorname, value.converter.schemas.enable=false, table.include.list=mock_collectionid, signal.consumer.security.protocol=SASL_SSL, snapshot.mode=when_needed, schemas.enable=false}]. Call init() first.
at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158)
at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:136)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:132)
at io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:221)
at io.debezium.connector.postgresql.PostgresSchema.lambda$refreshSchemas$3(PostgresSchema.java:183)
at java.base/java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(ConcurrentHashMap.java:4706)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at io.debezium.connector.postgresql.PostgresSchema.refreshSchemas(PostgresSchema.java:183)
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:93)
at io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.readTableStructure(PostgresSnapshotChangeEventSource.java:235)
at io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.readTableStructure(PostgresSnapshotChangeEventSource.java:38)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:167)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:102)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:298)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$doBlockingSnapshot$4(ChangeEventSourceCoordinator.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2025-09-23 23:17:55,785] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:114)
[2025-09-23 23:17:55,786] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] Snapshot was not completed successfully (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:135)
[2025-09-23 23:17:55,786] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] Error while executing requested blocking snapshot. (io.debezium.pipeline.ChangeEventSourceCoordinator:258)
io.debezium.DebeziumException: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=mock_canonicalname, connectorName=postgresql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.postgresql.PostgresConnector, signal.consumer.sasl.mechanism=PLAIN, topic.creation.default.partitions=1, streaming.delay.ms=3000, custom.metric.tags=table=mock_collectionid, slot.name=slot_postgres_debezium_source_dev_mock_schema_table, publication.name=mock_publicationname, signal.enabled.channels=kafka, snapshot.delay.ms=3000, topic.prefix=mock_canonicalname, heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), topic.creation.default.replication.factor=3, signal.data.collection=debezium.mock_schema_table_signals, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=mock_user, database.dbname=postgres, signal.kafka.bootstrap.servers=..., signal.kafka.topic=mock_signal--postgres-debezium-source-dev-mock_connectorname-signal, signal.consumer.ssl.endpoint.identification.algorithm=https, heartbeat.interval.ms=120000, topic.creation.default.retention.ms=604800000, plugin.name=pgoutput, database.port=mockport, ... ...;, topic.creation.enable=true, key.converter.schemas.enable=false, slot.failover=true, task.class=io.debezium.connector.postgresql.PostgresConnectorTask, database.hostname=mock_postgresdatabasename, database.password=..., name=postgres-debezium-source-dev-mock_connectorname, value.converter.schemas.enable=false, table.include.list=mock_collectionid, signal.consumer.security.protocol=SASL_SSL, snapshot.mode=when_needed, schemas.enable=false}]. Call init() first.
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:111)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:298)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$doBlockingSnapshot$4(ChangeEventSourceCoordinator.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=mock_canonicalname, connectorName=postgresql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.postgresql.PostgresConnector, signal.consumer.sasl.mechanism=PLAIN, topic.creation.default.partitions=1, streaming.delay.ms=3000, custom.metric.tags=table=mock_collectionid, slot.name=slot_postgres_debezium_source_dev_mock_schema_table, publication.name=mock_publicationname, signal.enabled.channels=kafka, snapshot.delay.ms=3000, topic.prefix=mock_canonicalname, heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), topic.creation.default.replication.factor=3, signal.data.collection=debezium.mock_schema_table_signals, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=mock_user, database.dbname=postgres, signal.kafka.bootstrap.servers=..., signal.kafka.topic=mock_signal--postgres-debezium-source-dev-mock_connectorname-signal, signal.consumer.ssl.endpoint.identification.algorithm=https, heartbeat.interval.ms=120000, topic.creation.default.retention.ms=604800000, plugin.name=pgoutput, database.port=mockport, ... ...;, topic.creation.enable=true, key.converter.schemas.enable=false, slot.failover=true, task.class=io.debezium.connector.postgresql.PostgresConnectorTask, database.hostname=mock_postgresdatabasename, database.password=..., name=postgres-debezium-source-dev-mock_connectorname, value.converter.schemas.enable=false, table.include.list=mock_collectionid, signal.consumer.security.protocol=SASL_SSL, snapshot.mode=when_needed, schemas.enable=false}]. Call init() first.
at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158)
at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:136)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:132)
at io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:221)
at io.debezium.connector.postgresql.PostgresSchema.lambda$refreshSchemas$3(PostgresSchema.java:183)
at java.base/java.util.concurrent.ConcurrentHashMap$KeySetView.forEach(ConcurrentHashMap.java:4706)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)
at io.debezium.connector.postgresql.PostgresSchema.refreshSchemas(PostgresSchema.java:183)
at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:93)
at io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.readTableStructure(PostgresSnapshotChangeEventSource.java:235)
at io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.readTableStructure(PostgresSnapshotChangeEventSource.java:38)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:167)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:102)
... 7 more
[2025-09-23 23:17:55,786] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Streaming resumed (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:271)
[2025-09-23 23:17:55,788] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:52)
org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1213)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:49)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:165)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:130)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:87)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:779)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:243)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:194)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:49)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:326)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:207)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:147)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1274)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1211)
... 16 more
[2025-09-23 23:17:55,789] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] Retry 1 of unlimited retries will be attempted (io.debezium.pipeline.ErrorHandler:125)
[2025-09-23 23:17:55,789] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] stopping streaming... (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource:206)
[2025-09-23 23:17:55,789] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] Unexpected exception while performing keepalive status update on the replication stream (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:842)
java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at io.debezium.util.Metronome$1.pause(Metronome.java:57)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.lambda$startKeepAlive$0(PostgresReplicationConnection.java:837)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2025-09-23 23:17:55,789] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing message decoder (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:924)
[2025-09-23 23:17:55,789] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component JdbcConnection, id = JdbcConnection named = jdbc-connection-close (io.debezium.util.Threads:273)
[2025-09-23 23:17:55,789] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-jdbcconnection-JdbcConnection-jdbc-connection-close (io.debezium.util.Threads:290)
[2025-09-23 23:17:55,791] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:988)
[2025-09-23 23:17:55,791] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:932)
[2025-09-23 23:17:55,791] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component JdbcConnection, id = JdbcConnection named = jdbc-connection-close (io.debezium.util.Threads:273)
[2025-09-23 23:17:55,791] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-jdbcconnection-JdbcConnection-jdbc-connection-close (io.debezium.util.Threads:290)
[2025-09-23 23:17:55,792] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] Operation jdbc-connection-close failed (io.debezium.util.Threads:339)
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.postgresql.util.PSQLException: Unable to close connection properly
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at io.debezium.util.Threads.runWithTimeout(Threads.java:331)
at io.debezium.jdbc.JdbcConnection.doClose(JdbcConnection.java:985)
at io.debezium.jdbc.JdbcConnection.close(JdbcConnection.java:975)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.close(PostgresReplicationConnection.java:933)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.close(PostgresReplicationConnection.java:919)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.cleanUpStreamingOnStop(PostgresStreamingChangeEventSource.java:221)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:200)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:49)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:326)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:207)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:147)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.RuntimeException: org.postgresql.util.PSQLException: Unable to close connection properly
at io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:991)
... 5 more
Caused by: org.postgresql.util.PSQLException: Unable to close connection properly
at org.postgresql.jdbc.PgConnection.close(PgConnection.java:873)
at io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:987)
... 5 more

Caused by: java.net.SocketException: Broken pipe
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1045)
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1308)
at org.postgresql.util.internal.PgBufferedOutputStream.flushBuffer(PgBufferedOutputStream.java:41)
at org.postgresql.util.internal.PgBufferedOutputStream.flush(PgBufferedOutputStream.java:48)
at org.postgresql.core.PGStream.flush(PGStream.java:708)
at org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
at org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:90)
at org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:221)
at org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:212)
at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870)
... 6 more
[2025-09-23 23:17:55,792] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] Unexpected error while closing Postgres connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:936)
org.postgresql.util.PSQLException: Unable to close connection properly
at org.postgresql.jdbc.PgConnection.close(PgConnection.java:873)
at io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:987)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)

Caused by: java.net.SocketException: Broken pipe
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1045)
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1308)
at org.postgresql.util.internal.PgBufferedOutputStream.flushBuffer(PgBufferedOutputStream.java:41)
at org.postgresql.util.internal.PgBufferedOutputStream.flush(PgBufferedOutputStream.java:48)
at org.postgresql.core.PGStream.flush(PGStream.java:708)
at org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
at org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:90)
at org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:221)
at org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:212)
at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870)
... 6 more
[2025-09-23 23:17:55,792] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:327)
[2025-09-23 23:17:55,792] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator:492)
[2025-09-23 23:17:55,796] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] Going to restart connector after 10 sec. after a retriable exception (io.debezium.connector.common.BaseSourceTask:473)
[2025-09-23 23:17:55,802] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] [Consumer clientId=9e97e1c9-5dac-4a46-a139-918363979e5a, groupId=kafka-signal] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1055)
[2025-09-23 23:17:55,803] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] [Consumer clientId=9e97e1c9-5dac-4a46-a139-918363979e5a, groupId=kafka-signal] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1102)
[2025-09-23 23:17:55,868] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2025-09-23 23:17:55,868] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2025-09-23 23:17:55,868] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter (org.apache.kafka.common.metrics.Metrics:688)
[2025-09-23 23:17:55,868] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)
[2025-09-23 23:17:55,870] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] App info kafka.consumer for 9e97e1c9-5dac-4a46-a139-918363979e5a unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2025-09-23 23:17:55,870] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] SignalProcessor stopped (io.debezium.pipeline.signal.SignalProcessor:122)
[2025-09-23 23:17:55,870] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component JdbcConnection, id = JdbcConnection named = jdbc-connection-close (io.debezium.util.Threads:273)
[2025-09-23 23:17:55,870] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-jdbcconnection-JdbcConnection-jdbc-connection-close (io.debezium.util.Threads:290)
[2025-09-23 23:17:55,872] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:988)
[2025-09-23 23:17:55,872] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Debezium ServiceRegistry stopped. (io.debezium.service.DefaultServiceRegistry:105)
[2025-09-23 23:17:55,872] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing message decoder (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:924)
[2025-09-23 23:17:55,872] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:932)
[2025-09-23 23:17:55,872] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Requested thread factory for component JdbcConnection, id = JdbcConnection named = jdbc-connection-close (io.debezium.util.Threads:273)
[2025-09-23 23:17:55,872] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Creating thread debezium-jdbcconnection-JdbcConnection-jdbc-connection-close (io.debezium.util.Threads:290)
[2025-09-23 23:17:55,873] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:988)
[2025-09-23 23:17:55,873] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] WorkerSourceTask{id=postgres-debezium-source-dev-mock_connectorname-0} failed to poll records from SourceTask. Will retry operation. (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask:471)
org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:63)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:197)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:49)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:326)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:207)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:147)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1213)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:49)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:165)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:130)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:87)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:779)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:243)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:194)
... 9 more
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1274)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1211)
... 16 more
[2025-09-23 23:17:55,876] ERROR [postgres-debezium-source-dev-mock_connectorname|task-0] WorkerSourceTask{id=postgres-debezium-source-dev-mock_connectorname-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=mock_canonicalname, connectorName=postgresql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.postgresql.PostgresConnector, signal.consumer.sasl.mechanism=PLAIN, topic.creation.default.partitions=1, streaming.delay.ms=3000, custom.metric.tags=table=mock_collectionid, slot.name=slot_postgres_debezium_source_dev_mock_schema_table, publication.name=mock_publicationname, signal.enabled.channels=kafka, snapshot.delay.ms=3000, topic.prefix=mock_canonicalname, heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), topic.creation.default.replication.factor=3, signal.data.collection=debezium.mock_schema_table_signals, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=mock_user, database.dbname=postgres, signal.kafka.bootstrap.servers=..., signal.kafka.topic=mock_signal--postgres-debezium-source-dev-mock_connectorname-signal, signal.consumer.ssl.endpoint.identification.algorithm=https, heartbeat.interval.ms=120000, topic.creation.default.retention.ms=604800000, plugin.name=pgoutput, database.port=mockport, ... ...;, topic.creation.enable=true, key.converter.schemas.enable=false, slot.failover=true, task.class=io.debezium.connector.postgresql.PostgresConnectorTask, database.hostname=mock_postgresdatabasename, database.password=..., name=postgres-debezium-source-dev-mock_connectorname, value.converter.schemas.enable=false, table.include.list=mock_collectionid, signal.consumer.security.protocol=SASL_SSL, snapshot.mode=when_needed, schemas.enable=false}]. Call init() first.
at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158)
at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:122)
at io.debezium.connector.common.BaseSourceTask.lambda$startIfNeededAndPossible$2(BaseSourceTask.java:434)
at java.base/java.util.Optional.ifPresentOrElse(Optional.java:196)
at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:433)
at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:325)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2025-09-23 23:17:55,876] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Stopping down connector (io.debezium.connector.common.BaseSourceTask:476)
[2025-09-23 23:17:55,876] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing message decoder (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:924)
[2025-09-23 23:17:55,876] DEBUG [postgres-debezium-source-dev-mock_connectorname|task-0] Closing replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:932)
[2025-09-23 23:17:55,876] WARN [postgres-debezium-source-dev-mock_connectorname|task-0] Failed to close source task with type org.apache.kafka.connect.runtime.AbstractWorkerSourceTask$$Lambda$1708/0x00007fb1519bd450 (org.apache.kafka.common.utils.Utils:1152)
java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=mock_canonicalname, connectorName=postgresql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.postgresql.PostgresConnector, signal.consumer.sasl.mechanism=PLAIN, topic.creation.default.partitions=1, streaming.delay.ms=3000, custom.metric.tags=table=mock_collectionid, slot.name=slot_postgres_debezium_source_dev_mock_schema_table, publication.name=mock_publicationname, signal.enabled.channels=kafka, snapshot.delay.ms=3000, topic.prefix=mock_canonicalname, heartbeat.action.query=SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), topic.creation.default.replication.factor=3, signal.data.collection=debezium.mock_schema_table_signals, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.user=mock_user, database.dbname=postgres, signal.kafka.bootstrap.servers=..., signal.kafka.topic=mock_signal--postgres-debezium-source-dev-mock_connectorname-signal, signal.consumer.ssl.endpoint.identification.algorithm=https, heartbeat.interval.ms=120000, topic.creation.default.retention.ms=604800000, plugin.name=pgoutput, database.port=mockport, ... ...;, topic.creation.enable=true, key.converter.schemas.enable=false, slot.failover=true, task.class=io.debezium.connector.postgresql.PostgresConnectorTask, database.hostname=mock_postgresdatabasename, database.password=..., name=postgres-debezium-source-dev-mock_connectorname, value.converter.schemas.enable=false, table.include.list=mock_collectionid, signal.consumer.security.protocol=SASL_SSL, snapshot.mode=when_needed, schemas.enable=false}]. Call init() first.
at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158)
at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:108)
at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:501)
at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:464)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1150)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:1133)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.close(AbstractWorkerSourceTask.java:314)
at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:183)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2025-09-23 23:17:55,876] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] [Producer clientId=connector-producer-postgres-debezium-source-dev-mock_connectorname-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1346)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter (org.apache.kafka.common.metrics.Metrics:688)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] App info kafka.producer for connector-producer-postgres-debezium-source-dev-mock_connectorname-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2025-09-23 23:17:55,877] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] App info kafka.admin.client for connector-adminclient-postgres-debezium-source-dev-mock_connectorname-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2025-09-23 23:17:55,878] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2025-09-23 23:17:55,878] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2025-09-23 23:17:55,878] INFO [postgres-debezium-source-dev-mock_connectorname|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)


Chris Cranford

unread,
Sep 24, 2025, 12:33:47 AM (8 days ago) Sep 24
to debe...@googlegroups.com
Thanks for the logs, this makes a bit more sense.  So I wonder if the connection issue here is just a product of the DebeziumOpenLineage failure.

This recently came up in our Zulip chat where it was determined the user had multiple connectors sharing the same topic prefix [1]. Is that the case in your deployment? If so, those should be unique and fixing that should resolve the OpenLineage error. After fixing that, can you check that your blocking snapshots work as expected?

-cc

[1]:
#community-postgresql > DebeziumOpenLineageEmitter not initialized error

Chris Cranford

unread,
Sep 24, 2025, 12:36:42 AM (8 days ago) Sep 24
to debe...@googlegroups.com
Also, just to add context, there were some recent improvements to blocking snapshots to help make them more resilient. I'd suggest moving to the latest 3.2 build, or the 3.3 build in the next few weeks.

Mikhail Spirin

unread,
Sep 24, 2025, 6:46:29 PM (7 days ago) Sep 24
to debe...@googlegroups.com
That was a great idea and yes, that was the case in my setup. I changed that, all initial snapshots for all 10 tables went fine consequently (i united them all into one connector, previously it was 10 connectors)
It created all 10 tables and... Unfortunately blocking snapshot failed with same error, just without DebeziumOpenLineage failure((

I am on 3.2.2 version.

We theoretically could live without snapshotting, by changing snapshot mode to always each time for resnapshoting but it is a very ugly design...


Reply all
Reply to author
Forward
0 new messages