Issue with custom query using jdbc source connector

1,131 views
Skip to first unread message

Pavan Gavvala

unread,
Jul 11, 2017, 4:50:00 AM7/11/17
to Confluent Platform

I am trying to use custom query using Confluent Kafka JDBC source connector. When I run it I see the below exception related to

Properties file for jdbc source connector:

name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://10.2.152.45:3306/test_kafka?user=xxx&password=xxx
query=select id,name,modified_date from user_detail2;
#mode=timestamp
mode=incrementing
incrementing.column.name=id
#timestamp.column.name=modified_date
topic.prefix=test11_mysql_jdbc_avro_joined_data

ERROR:

    [2017-07-11 01:56:57,543] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='select id,name,modified_date from user_detail2;', topicPrefix='test11_mysql_jdbc_avro_joined_data', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `id` > -1 ORDER BY `id` ASC' at line 1
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
        at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

If timestamp is used, getting below error

ERROR:

[2017-07-11 01:04:33,710] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='select user_detail2.id, user_detail2.modified_date, user_detail2.name from user_detail2;', topicPrefix='test11_mysql_jdbc_avro_joined_data', timestampColumn='modified_date', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `modified_date` > '1969-12-31 16:00:00' AND `modified_date` < '2017-07-11 ' at line 1
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
        at com.mysql.jdbc.Util.getInstance(Util.java:408)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
        at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:162)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Pavan Gavvala

unread,
Jul 12, 2017, 11:15:37 AM7/12/17
to Confluent Platform
Hi,

Does anybody have idea on how to resolve this ? 

Ewen Cheslack-Postava

unread,
Jul 20, 2017, 7:13:14 PM7/20/17
to Confluent Platform
As the docs note under the query section here http://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#incremental-query-modes, you can only use a custom query with an incremental mode if it is structured such that the necessary WHERE clause can simply be appended to the query you give. In your case, this might work by just removing the trailing semicolon in your query.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/2070b016-d60b-43f1-a459-871c52d7fbaa%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages