Hey,
I try to write a certain table into Kafka using kafka-connect-jdbc connector using timestamp+incrementing mode.
After writing a few SQL records to the topic successfully, it errors with the following logs (logs anonymized):
[2017-07-03 09:53:56,978] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.JdbcSourceTask)
java.sql.SQLException: Value ' 632 some_text 2016-11-30 00:00:00 2017-11-30 00:00:00 2017-05-23 09:48:57 2016-11-30 03:57:52 cfa0c259893b4725b33bed6d5cc569cb 0.428 USD 0 0 1 25 0.092189128018704 0.404 5122817 1� 0000-00-00 00:00:00 0 1 1��long_link 0� 50 0 50 0 0 AT�� 0 0 0 50 0 0' can not be represented as java.sql.Timestamp
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:963)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:896)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:885)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)
at com.mysql.jdbc.ResultSetRow.getTimestampFast(ResultSetRow.java:937)
at com.mysql.jdbc.BufferRow.getTimestampFast(BufferRow.java:559)
at com.mysql.jdbc.ResultSetImpl.getTimestampInternal(ResultSetImpl.java:5946)
at com.mysql.jdbc.ResultSetImpl.getTimestamp(ResultSetImpl.java:5636)
at io.confluent.connect.jdbc.source.DataConverter.convertFieldValue(DataConverter.java:452)
at io.confluent.connect.jdbc.source.DataConverter.convertRecord(DataConverter.java:64)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:181)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-03 09:53:56,989] INFO Finished WorkerSourceTask{id=foo12-jdbc-updatedat-0} commitOffsets successfully in 6 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2017-07-03 09:53:56,989] ERROR Task foo12-jdbc-updatedat-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:911)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:325)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
The long string after Value - which it is trying to parse into a Timestamp field - is the ENTIRE record (632 is the id int autoincrementing field, 'some_text' is a string field etc.).
I then deleted the misbehaving record in SQL and started the connector again. It writes more records successfully to Kafka, but is erroring at another record now later.
So what I think is happening is, that despite recognizing an SQL error, it still tries to write the record to Kafka, which then errors because it is malformed and the task stops. Can that be correct?
Regarding the initial parsing problem, I checked if the misbehaving records are somewhat different than other records which get written into Kafka successfully, however could not find any.
I'll happily provide the SQL and/or Avro schema if needed, and/or file an issue at the Kafka-connect-jdbc Github repository if you want me to.
Best,
Basti