Debezium postgres connector shut down after network error

976 views
Skip to first unread message

Juha Syrjälä

unread,
Oct 22, 2021, 4:21:48 AM10/22/21
to debezium
Hello,

I was running Debezium Postgres connector in AWS MSK Connect, that is, Kafka Connect which is managed by AWS.

There was some short lasting network glitch between Debezium Connector and Postgres database. Both of them were in the same AWS region and VPC.

The end result was this stack trace and the connector was shut down. So debezium got an exception from Postgres JDBC driver and shut down because of it.

Parameter event.processing.failure.handling.mode was not set, so it defaulted to fail.
Other options were warn and skip which both would just skip the event, and debezium would fail at some other place since connection to Postgres is likely not working currently.

What would be right way or configuration handle this kind of situation?



```
[2021-10-20 22:55:32,282] ERROR [my-kafka|task-0] Failed to read column metadata for 'public.mytable' (io.debezium.connector.postgresql.connectio
n.pgoutput.PgOutputMessageDecoder:341)"
[2021-10-20 22:55:32,282] ERROR [my-kafka|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:31)"
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:349)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:322)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:308)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:284)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:236)
at org.postgresql.jdbc.PgDatabaseMetaData.getColumns(PgDatabaseMetaData.java:1577)
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.getTableColumnsFromDatabase(PgOutputMessageDecoder.java:333)
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:272)
at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:176)
at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:205)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:167)
at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:40)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:166)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:127)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketException: Connection reset
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478)
at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472)
at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1374)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:985)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:73)
at org.postgresql.core.PGStream.receiveChar(PGStream.java:443)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2069)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322)
... 23 more
[2021-10-20 22:55:32,284] INFO [my-kafka|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)"
[2021-10-20 22:55:32,289] INFO [my-kafka|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:965)"
[2021-10-20 22:55:32,289] INFO [my-kafka|task-0] Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:167)"
[2021-10-20 22:55:32,290] INFO [my-kafka|task-0] Connected metrics set to 'false' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:70)"
```

Nuria Ruiz

unread,
Oct 22, 2021, 2:01:42 PM10/22/21
to debezium

>What would be right way or configuration handle this kind of situation?
A disconnect like this would be reported by Kafka connect as a task with status FAILED (see https://docs.confluent.io/home/connect/monitoring.html). 
You can have your connector grab that status and re-start. 

As an example, we run our self- managed connectors in AWS and have code like this in docker to manage such event:

# do check for connector/task FAILED status, if so, exit entirely
# inside a while loop
status=$(curl -s "localhost:8083/connectors/${connector_name}/status")
connector_status=$(echo $status | jq -r .connector.state)
task_status=$(echo $status | jq -r ".tasks[] .state")

if [ "$connector_status" != "RUNNING" ] || [ "$task_status" != "RUNNING" ]; then
exit 1
fi
sleep some time
# end while loop
Reply all
Reply to author
Forward
0 new messages