I am trying to use custom query using Confluent Kafka JDBC source connector. When I run it I see the below exception related to
Properties file for jdbc source connector:name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://
10.2.152.45:3306/test_kafka?user=xxx&password=xxxquery=select id,name,modified_date from user_detail2;
#mode=timestamp
mode=incrementing
incrementing.column.name=id
#
timestamp.column.name=modified_date
topic.prefix=test11_mysql_jdbc_avro_joined_data
ERROR: [2017-07-11 01:56:57,543] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='select id,name,modified_date from user_detail2;', topicPrefix='test11_mysql_jdbc_avro_joined_data', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.Util.getInstance(Util.java:408)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
If timestamp is used, getting below error
ERROR:[2017-07-11 01:04:33,710] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='select
user_detail2.id, user_detail2.modified_date,
user_detail2.name from user_detail2;', topicPrefix='test11_mysql_jdbc_avro_joined_data', timestampColumn='modified_date', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `modified_date` > '1969-12-31 16:00:00' AND `modified_date` < '2017-07-11 ' at line 1
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.Util.getInstance(Util.java:408)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)