Debezium JDBC Sink Connector Timing Out with MySQL Server

917 views
Skip to first unread message

Jeet Thakkar

unread,
Aug 30, 2023, 3:14:26 PM8/30/23
to debezium
Hello, I have been successfully able to use Debezium JDBC Sink connector to read the Debezium topics from a Redpanda Cluster into a destination MySQL Table. However, the connector fails after some time with the error text shown below. It seems like the MySQL connection of the Debezium JDBC Sink Connector is timing out.  The error message recommends (raw format log shown below the listed recommendations):
- Increasing the server value for client timeouts on MySQL server; this is kind of not an option.
- using the connector/J connection property autoReconnect=true. However, I do not see the property on the JDBC Sink Connectors docs on connector properties section of Debezium Connector for JDBC (https://debezium.io/documentation/reference/stable/connectors/jdbc.html#jdbc-connector-properties):

Caused by: com.mysql.cj.exceptions.CJCommunicationsException: The last packet successfully received from the server was 54,502,662 milliseconds ago. The last packet sent successfully to the server was 54,502,663 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.

The entirety of the raw log is shown below for reference.

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:80)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72)
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
        ... 11 more
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
        at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:98)
        at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
        at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
        at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
        at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:110)
        at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:137)
        at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:302)
        at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
        at org.hibernate.internal.AbstractSharedSessionContract.doReturningWork(AbstractSharedSessionContract.java:958)
        at io.debezium.connector.jdbc.JdbcChangeEventSink.hasTable(JdbcChangeEventSink.java:128)
        at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:97)
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:68)
        ... 12 more
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last packet successfully received from the server was 54,502,662 milliseconds ago. The last packet sent successfully to the server was 54,502,663 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.
        at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:175)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
        at com.mysql.cj.jdbc.ConnectionImpl.setAutoCommit(ConnectionImpl.java:2022)
        at com.mchange.v2.c3p0.impl.NewProxyConnection.setAutoCommit(NewProxyConnection.java:1059)
        at org.hibernate.c3p0.internal.C3P0ConnectionProvider.getConnection(C3P0ConnectionProvider.java:73)
        at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:38)
        at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:107)
        ... 19 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: The last packet successfully received from the server was 54,502,662 milliseconds ago. The last packet sent successfully to the server was 54,502,663 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.
        at jdk.internal.reflect.GeneratedConstructorAccessor79.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:62)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:150)
        at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:166)
        at com.mysql.cj.protocol.a.NativeProtocol.send(NativeProtocol.java:629)
        at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:684)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1052)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:998)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:655)
        at com.mysql.cj.jdbc.ConnectionImpl.setAutoCommit(ConnectionImpl.java:2005)
        ... 23 more
Caused by: java.net.SocketException: Connection reset by peer (Write failed)
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
        at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1309)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
        at com.mysql.cj.protocol.a.SimplePacketSender.send(SimplePacketSender.java:55)
        at com.mysql.cj.protocol.a.TimeTrackingPacketSender.send(TimeTrackingPacketSender.java:50)
        at com.mysql.cj.protocol.a.NativeProtocol.send(NativeProtocol.java:620)
        ... 28 more

Jeet Thakkar

unread,
Aug 30, 2023, 9:19:19 PM8/30/23
to debezium
Never mind, I think I found the solution. We have to add the `autoReconnect=true` to the JDBC connection String as a parameter, as shown in this stack overflow thread.

{...
"connection.url"
: "jdbc:mysql://<mysql-ip>:3306/msistonedb?autoReconnect=true",
...}

I think this should resolve this problem! thanks.

Jeet Thakkar

unread,
Sep 1, 2023, 11:06:46 AM9/1/23
to debezium
Hello, unfortunately, this does not work. I still have the connectors failing.

Chris Cranford

unread,
Sep 5, 2023, 9:27:14 AM9/5/23
to debe...@googlegroups.com
Hi -

Please implement connection testing in the connector configuration:

    "hibernate.c3p0.testConnectionOnCheckOut": "true"

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/c431867b-8016-4e8d-ae68-0515a2715335n%40googlegroups.com.

Jeet Thakkar

unread,
Sep 12, 2023, 7:52:25 PM9/12/23
to debezium
Hey Chris, Thanks for the reply. I was unable to test this out until now, and I still have the problem. I have verified that configuration did go into the connector but it is still happening that the connector fails because of that error; As a temporary fall back I have a python script which fetches the status of the connector and restart if any of the workers have failed. My kafka connect worker version is 3.3.2 and please find the connector config shown below:


"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"table.name.format": "<REDACTED>",
"primary.key.mode": "record_key",
"connection.password": "<REDACTED>",
"database.time_zone": "UTC",
"tasks.max": "4",
"topics": "<REDACTED>",
"connection.username": "<REDACTED>",
"delete.enabled": "true",
"schema.evolution": "none",
"connection.url": "jdbc:mysql://<HOSTIP>:3306/<DBNAME>",
"insert.mode": "upsert",
"hibernate.c3p0.testConnectionOnCheckOut": "true",
"name": "dmz-debezium_jdbc_msistonedb.ttrafficproductsearchkey_logs"
}


Reply all
Reply to author
Forward
0 new messages