kakfa jdbc sqlserver connect timestamp mode

191 views
Skip to first unread message

sujatha mysore

unread,
Apr 12, 2021, 4:33:41 PM4/12/21
to Confluent Platform
hi,
I am trying to use kakfka connect sqlserver.
this is my configuration:

I want to use mode=timestamp
with query
case 1-:  for this combination:
=======================
table.whitelist: ngv_task
query=select * from (SELECT taskid, uowid, taskstatus, endtime FROM ngv_task where taskstatus in ('SKIP','SHORT','CUT')) o
mode=timestamp
validate.non.null=false
tasks.max=1
topic=sqlconnect-test
table.types: TABLE
schema.ignore: true

   query = select * from (SELECT taskid, uowid, taskstatus, endtime FROM ngv_task where taskstatus in ('SKIP','SHORT','CUT')) o
        query.suffix =
        quote.sql.identifiers = ALWAYS
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = [ngv_task]
        timestamp.column.name = [endtime]
        timestamp.initial = null
        topic.prefix =
        validate.non.null = false
 (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:354)
[2021-04-12 16:31:12,565] INFO Attempting to open connection #1 to SqlServer (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2021-04-12 16:31:12,986] ERROR WorkerConnector{id=source-jdbc-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:193)
org.apache.kafka.connect.errors.ConnectException: query may not be combined with whole-table copying settings.
        at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:108)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2021-04-12 16:31:12,991] ERROR Failed to create job for /mnt/c/kafkaDocker/code/source/demo-4/JdbcWorker.properties (org.apache.kafka.connect.cli.ConnectStandalone:110)
[2021-04-12 16:31:12,992] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector source-jdbc-connector to state STARTED

case2: tried without giving specific query
and I get this error

   table.whitelist = [ngv_task]
        tables = ["vlp"."dbo"."ngv_task"]
        timestamp.column.name = [endtime]
        timestamp.initial = null
        topic.prefix =
        validate.non.null = false
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:354)
[2021-04-12 16:33:48,756] INFO Using JDBC dialect SqlServer (io.confluent.connect.jdbc.source.JdbcSourceTask:102)
[2021-04-12 16:33:48,980] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)
[2021-04-12 16:33:48,981] INFO WorkerSourceTask{id=source-jdbc-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2021-04-12 16:33:48,983] INFO Attempting to open connection #1 to SqlServer (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2021-04-12 16:33:49,223] INFO Begin using SQL query: SELECT * FROM "vlp"."dbo"."ngv_task" WHERE "vlp"."dbo"."ngv_task"."endtime" > ? AND "vlp"."dbo"."ngv_task"."endtime" < ? ORDER BY "vlp"."dbo"."ngv_task"."endtime" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
[2021-04-12 16:33:49,394] INFO Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:324)
[2021-04-12 16:33:49,396] INFO Closing connection #1 to SqlServer (io.confluent.connect.jdbc.util.CachedConnectionProvider:108)
[2021-04-12 16:33:49,398] INFO WorkerSourceTask{id=source-jdbc-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
[2021-04-12 16:33:49,399] INFO WorkerSourceTask{id=source-jdbc-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
[2021-04-12 16:33:49,405] ERROR WorkerSourceTask{id=source-jdbc-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.DataException: endtime is not a valid field name

can you help me find what am I doing wrong with this configuration.
I would really need a query and timestamp, so can get i=changed data, based on time stamp.

appreciate your help.
thanks
sujatha

sujatha mysore

unread,
Apr 15, 2021, 1:04:54 PM4/15/21
to Confluent Platform
issue resolved
Reply all
Reply to author
Forward
0 new messages