Hi all,
We're having an issue with timestamp sinking for infinity value:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
... 12 more
Caused by: java.time.format.DateTimeParseException: Text 'infinity' could not be parsed at index 0
at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2052)
at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1954)
at java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:600)
at io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.bind(ZonedTimestampType.java:48)
at io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor.bind(SinkRecordDescriptor.java:247)
at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.bindValue(GeneralDatabaseDialect.java:417)
at io.debezium.connector.jdbc.RecordWriter.bindFieldValuesToQuery(RecordWriter.java:156)
at io.debezium.connector.jdbc.RecordWriter.bindNonKeyValuesToQuery(RecordWriter.java:141)
at io.debezium.connector.jdbc.RecordWriter.bindValues(RecordWriter.java:115)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:75)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$4(AbstractSharedSessionContract.java:966)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:977)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:965)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203)
... 17 more
message parts of interest look like this:
{
"before":NULL
"after":{
...
"starttime":"2024-04-11T10:34:00.000000Z"
"endtime":"infinity"
...
}
"source":{
"version":"2.2.1.Final"
"connector":"postgresql"
...
}
"op":"c"
"ts_ms":1717411223710
"transaction":NULL
}
Postgres version: 16
Debezium version 2.2.1.Final