Kafka-connect-jdbc oddly fails to parse certain SQL records and errors afterwards

143 views
Skip to first unread message

Basti

unread,
Jul 3, 2017, 7:31:28 AM7/3/17
to Confluent Platform
Hey,


I try to write a certain table into Kafka using kafka-connect-jdbc connector using timestamp+incrementing mode.
After writing a few SQL records to the topic successfully, it errors with the following logs (logs anonymized):



[2017-07-03 09:53:56,978] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.JdbcSourceTask)
java
.sql.SQLException: Value ' 632 some_text 2016-11-30 00:00:00 2017-11-30 00:00:00 2017-05-23 09:48:57 2016-11-30 03:57:52 cfa0c259893b4725b33bed6d5cc569cb 0.428 USD 0 0 1 25 0.092189128018704 0.404 5122817 1� 0000-00-00 00:00:00 0 1 1��long_link 0� 50 0 50 0 0 AT�� 0 0 0 50 0 0' can not be represented as java.sql.Timestamp
 at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:963)
 at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:896)
 at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:885)
 at com
.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)
 at com
.mysql.jdbc.ResultSetRow.getTimestampFast(ResultSetRow.java:937)
 at com
.mysql.jdbc.BufferRow.getTimestampFast(BufferRow.java:559)
 at com
.mysql.jdbc.ResultSetImpl.getTimestampInternal(ResultSetImpl.java:5946)
 at com
.mysql.jdbc.ResultSetImpl.getTimestamp(ResultSetImpl.java:5636)
 at io
.confluent.connect.jdbc.source.DataConverter.convertFieldValue(DataConverter.java:452)
 at io
.confluent.connect.jdbc.source.DataConverter.convertRecord(DataConverter.java:64)
 at io
.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:181)
 at io
.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:205)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
 at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
 at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)
[2017-07-03 09:53:56,989] INFO Finished WorkerSourceTask{id=foo12-jdbc-updatedat-0} commitOffsets successfully in 6 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2017-07-03 09:53:56,989] ERROR Task foo12-jdbc-updatedat-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org
.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
 at io
.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:911)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:325)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
 at io
.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)

The long string after Value - which it is trying to parse into a Timestamp field - is the ENTIRE record (632 is the id int autoincrementing field, 'some_text' is a string field etc.).
I then deleted the misbehaving record in SQL and started the connector again. It writes more records successfully to Kafka, but is erroring at another record now later. 

So what I think is happening is, that despite recognizing an SQL error, it still tries to write the record to Kafka, which then errors because it is malformed and the task stops. Can that be correct?
Regarding the initial parsing problem, I checked if the misbehaving records are somewhat different than other records which get written into Kafka successfully, however could not find any.

I'll happily provide the SQL and/or Avro schema if needed, and/or file an issue at the Kafka-connect-jdbc Github repository if you want me to.

Best,
Basti

Basti

unread,
Jul 3, 2017, 10:14:29 AM7/3/17
to Confluent Platform
So I figured out the underlying problem.

If one adds a new column to a MySQL table, it obvisiously has to give all records in the table a value for this column. If your new column has the definition
datetime NOT NULL
MySQL will pick the default value for datetime, which is NULL. Therefore you will have a schema which says NOT NULL but still have records which have NULL value for that column in your table. Only if MySQL runs in strict mode, it will prevent this and throw an error when creating the column. If you don't, Kafka Connect will initially load the schema which says NOT NULL and will abort when it encounters records which have a NULL value at the column, since it violates the schema.
Reply all
Reply to author
Forward
0 new messages