MS SQL Server sink connector

410 views
Skip to first unread message

ronnie peterson

unread,
Mar 13, 2020, 3:33:21 PM3/13/20
to Confluent Platform

HI,

I need to create a sink connetor that send the messages from Kafka to a MS SQL Server, I don't find a sink connector on confluent hub, just a source connector.
I'm trying to make a connection with a JDBC Sink Connector, but it's reporting a error.

If someone have any solution I appreciate it

curl -i -X POST -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
  "name": "mssqlsinkconnector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:sqlserver://10.121.1.85:1433;database=poc_cdc_redis",
    "connection.user": "kafka",
    "connection.password": "bancointer",
    "mode": "incrementing",
    "topics": "server1.dbo.customers",
    "name": "mssqlsinkconnector",
    "table.whitelist": "dbo.costumers_2"
  }
}
'

[root@ip-10-121-18-95 bin]# curl localhost:8083/connectors/mssqlsinkconnector/tasks | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100   690  100   690    0     0   224k      0 --:--:-- --:--:-- --:--:--  224k

[

  {

    "id": {

      "connector": "mssqlsinkconnector",

      "task": 0

    },

    "config": {

      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

      "dialect.name": "SqlServerDatabaseDialect",

      "table.name.format": "\"kafka_${server1.dbo.costumers}\"",

      "connection.password": "bancointer",

      "tasks.max": "1",

      "topics": "server1.dbo.customers",

      "fields.whitelist": "first_name, last_name, email",

      "auto.evolve": "true",

      "task.class": "io.confluent.connect.jdbc.sink.JdbcSinkTask",

      "connection.user": "kafka",

      "name": "mssqlsinkconnector",

      "auto.create": "true",

      "connection.url": "jdbc:sqlserver://10.121.1.85:1433;database=poc_cdc_redis;",

      "insert.mode": "upsert",

      "pk.mode": "record_value",

      "pk.fields": "id",

      "quote.sql.identifiers": "always"

    }

  }

]




[root@ip-10-121-18-95 bin]# curl localhost:8083/connectors/mssqlsinkconnector/status | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

                                 Dload  Upload   Total   Spent    Left  Speed

100  2303  100  2303    0     0   749k      0 --:--:-- --:--:-- --:--:--  749k

{

  "name": "mssqlsinkconnector",

  "connector": {

    "state": "RUNNING",

    "worker_id": "10.121.18.95:8083"

  },

  "tasks": [

    {

      "id": 0,

      "state": "FAILED",

      "worker_id": "10.121.18.95:8083",

      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:sqlserver://10.121.1.85:1433;database=poc_cdc_redis;\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:69)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:56)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\t... 10 more\nCaused by: java.sql.SQLException: No suitable driver found for jdbc:sqlserver://10.121.1.85:1433;database=poc_cdc_redis;\n\tat java.sql.DriverManager.getConnection(DriverManager.java:689)\n\tat java.sql.DriverManager.getConnection(DriverManager.java:208)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:217)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:88)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:62)\n\t... 13 more\n"

    }

  ],

  "type": "sink"

}




daniel queiroz

unread,
Mar 13, 2020, 4:14:49 PM3/13/20
to confluent...@googlegroups.com
Hello,

First of all, you need to create a directory (connectors) and put there a JDBC driver like the image bellow:
Screen Shot 2020-03-13 at 17.08.06.png

Then, edit the file connect-distributed.properties and insert a new line with the path created above, like this example:

plugin.path=/opt/kafka/connectors/

Screen Shot 2020-03-13 at 17.11.57.png

After that, restart the service.

Grato,

Daniel Queiroz
81 996289671


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5b81867d-fc89-4f14-bc89-86f7f31d72c8%40googlegroups.com.

ronnie peterson

unread,
Mar 16, 2020, 8:03:42 AM3/16/20
to Confluent Platform

Hi Daniel,

Thanks for your help.

I had already done this procedure, but the error is still occurring, attached the evidence
connector.png
plugin.png

GIRISH KUMAR PINUMALLA

unread,
Mar 16, 2020, 8:21:14 AM3/16/20
to confluent...@googlegroups.com
Hi Ronnie, 
Check the logs of confluent kafka . whether it is picking this sqljdbc41 or sqljdbc42 jar file when you are restarting confluent. I think it is not picking in the logs of kafka. if it picks you will be able to run the kafka connect jdbc seamlessly. 

Thanks & Regards,
Girish P


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

ronnie peterson

unread,
Mar 16, 2020, 10:26:16 AM3/16/20
to Confluent Platform
Hi GIRISH, thanks for you help.


These files are not actually mentioned in the kafka log, would you have any tips to configure the calling of these files when starting kafka?

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

GIRISH KUMAR PINUMALLA

unread,
Mar 16, 2020, 4:15:02 PM3/16/20
to confluent...@googlegroups.com
Hi Ronnie, 
Try setting the configuring the jar files in the kafka config file which will direct to all jar files. somehow your path is not getting picked up . try putting in the env variables

Thanks & Regards,
Girish P

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/61fa04eb-feb4-4195-8147-2e2910dc2f6c%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages