Kafka Connect JDBC not finding Oracle driver

1,817 views
Skip to first unread message

gvdm

unread,
Sep 7, 2017, 7:13:22 AM9/7/17
to Confluent Platform
Helloo you all,

I would like to use the built-in JDBC sinc Connect. Here is my configuration

kafka-connect-jdbc.properties

name=kafka-connect-jdbc
connector
.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks
.max=1
topics
=TRACE_STATISTICS
connection
.password = db_password
connection
.user = db_username
connection
.url = jdbc:oracle:thin:@//my_db_server:1521/service_name
auto.create=true
auto.evolve=true


kafka-connect-jdbc-worker.properties

bootstrap.servers=kafka_server:9092
key
.converter=io.confluent.connect.avro.AvroConverter
key
.converter.schema.registry.url=http://schema_registry_server:8081
value
.converter=io.confluent.connect.avro.AvroConverter
value
.converter.schema.registry.url=http://schema_registry_server:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset
.storage.file.filename=/tmp/connect.offsets
port
=10001
rest
.port=10001
access
.control.allow.origin=*
access
.control.allow.methods=GET,OPTIONS,HEAD,POST,PUT,DELETE
consumer
.session.timeout.ms=300000
consumer
.request.timeout.ms=310000
heartbeat
.interval.ms= 60000
session
.timeout.ms= 200000
flush
.timeout.ms=500000
plugin
.path=/root/connect_plugins/ # Here there is the ojdbc7.jar archive

Then I run the connector with the command

/usr/bin/connect-standalone /root/kafka-conf/kafka-connect-jdbc-worker.properties /root/kafka-conf/kafka-connect-jdbc.properties


But after the initialization of the connector class, it cannot perform the JDBC Oracle request even though I configured the plugin path to include the Oracle JDBC driver in the worker file

This is the log of the error

/usr/bin/connect-standalone /root/kafka-conf/kafka-connect-jdbc-worker.properties /root/kafka-conf/kafka-connect-jdbc.properties
[2017-09-07 12:52:42,593] INFO Loading plugin from: /root/connect_plugins/ojdbc7.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
[2017-09-07 12:52:42,867] INFO Registered loader: PluginClassLoader{pluginLocation=file:/root/connect_plugins/ojdbc7.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[...]
[2017-09-07 12:52:47,771] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-09-07 12:52:47,778] INFO ConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSinkConnector
        key
.converter = null
        name
= kafka-connect-jdbc
        tasks
.max = 1
        transforms
= null
        value
.converter = null
 
(org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-09-07 12:52:47,778] INFO EnrichedConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSinkConnector
        key
.converter = null
        name
= kafka-connect-jdbc
        tasks
.max = 1
        transforms
= null
        value
.converter = null
 
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-09-07 12:52:47,779] INFO Creating connector kafka-connect-jdbc of type io.confluent.connect.jdbc.JdbcSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-09-07 12:52:47,780] INFO Instantiated connector kafka-connect-jdbc with version 3.3.0 of type class io.confluent.connect.jdbc.JdbcSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-09-07 12:52:47,781] INFO Finished creating connector kafka-connect-jdbc (org.apache.kafka.connect.runtime.Worker:225)
[2017-09-07 12:52:47,781] INFO SinkConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSinkConnector
        key
.converter = null
        name
= kafka-connect-jdbc
        tasks
.max = 1
        topics
= [TRACE_STATISTICS]
        transforms
= null
        value
.converter = null
 
(org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-09-07 12:52:47,781] INFO EnrichedConnectorConfig values:
        connector
.class = io.confluent.connect.jdbc.JdbcSinkConnector
        key
.converter = null
        name
= kafka-connect-jdbc
        tasks
.max = 1
        topics
= [TRACE_STATISTICS]
        transforms
= null
        value
.converter = null
 
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[...]
[2017-09-07 12:52:47,825] INFO Kafka version : 0.11.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-09-07 12:52:47,825] INFO Kafka commitId : bba25847402e83a0 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-09-07 12:52:47,827] INFO Created connector kafka-connect-jdbc (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-09-07 12:52:47,828] INFO Starting task (io.confluent.connect.jdbc.sink.JdbcSinkTask:43)
[2017-09-07 12:52:47,828] INFO JdbcSinkConfig values:
       
auto.create = true
       
auto.evolve = true
        batch
.size = 3000
        connection
.password = [hidden]
        connection
.url = jdbc:oracle:thin:@//my_db_server:1521/service_name
        connection
.user = db_username
        fields
.whitelist = []
        insert
.mode = insert
        max
.retries = 10
        pk
.fields = []
        pk
.mode = none
       
retry.backoff.ms = 3000
        table
.name.format = ${topic}
 
(io.confluent.connect.jdbc.sink.JdbcSinkConfig:223)
[2017-09-07 12:52:47,832] INFO Initializing writer using SQL dialect: OracleDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:52)
[2017-09-07 12:52:47,833] INFO Sink task WorkerSinkTask{id=kafka-connect-jdbc-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-09-07 12:52:47,890] INFO Discovered coordinator my_server:9092 (id: 2147483647 rack: null) for group connect-kafka-connect-jdbc. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-09-07 12:52:47,891] INFO Revoking previously assigned partitions [] for group connect-kafka-connect-jdbc (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-09-07 12:52:47,891] INFO (Re-)joining group connect-kafka-connect-jdbc (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-09-07 12:52:47,898] INFO Successfully joined group connect-kafka-connect-jdbc with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-09-07 12:52:47,900] INFO Setting newly assigned partitions [TRACE_STATISTICS-0] for group connect-kafka-connect-jdbc (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-09-07 12:52:48,115] ERROR Task kafka-connect-jdbc-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org
.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@//my_db_server:1521/service_name
        at io
.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:59)
        at io
.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:52)
        at io
.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java
.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@//my_db_server:1521/service_name
        at java
.sql.DriverManager.getConnection(DriverManager.java:689)
        at java
.sql.DriverManager.getConnection(DriverManager.java:247)
        at io
.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:66)
        at io
.confluent.connect.jdbc.util.CachedConnectionProvider.getValidConnection(CachedConnectionProvider.java:52)
       
... 13 more


What can be the problem? Where am I wrong?

Thank you
Giulio

Robin Moffatt

unread,
Sep 7, 2017, 7:38:26 AM9/7/17
to confluent...@googlegroups.com
Per the docs (http://docs.confluent.io/current/connect/connect-jdbc/docs/source_connector.html#jdbc-drivers), I would suggest putting the JDBC driver in your CLASSPATH. From the looks of it the new plugin.path won't suffice.


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/468fbef4-4ce7-43cd-bca4-513b48e7e512%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


gvdm

unread,
Sep 7, 2017, 7:43:01 AM9/7/17
to Confluent Platform
Hi Robin, thanks for the response.
But why do the plugin.path property does not work? It was introduced to solve the plugin path problems, no? 
To post to this group, send email to confluent...@googlegroups.com.

gvdm

unread,
Sep 7, 2017, 7:45:47 AM9/7/17
to Confluent Platform
Anyway, I tried this command

CLASSPATH=/root/connect_plugins/ /usr/bin/connect-standalone /root/kafka-conf/kafka-connect-jdbc-worker.properties /root/kafka-conf/kafka-connect-jdbc.properties

 
and commented out the plugin.path property but nothing changed. In the Connect logs now there is not mention to the Oracle JDBC driver.

ro...@confluent.io

unread,
Sep 7, 2017, 7:52:14 AM9/7/17
to Confluent Platform
I usually copy it to share/java/kafka-connect-jdbc/ and it works just fine. 

In the Connect logs now there is not mention to the Oracle JDBC driver.

If it's still not working, but the driver's not mentioned any more, do you have a new error? Can you share the log?


thanks, Robin. 

gvdm

unread,
Sep 7, 2017, 8:04:00 AM9/7/17
to Confluent Platform
The error was the same posted above.
Now I've tried copying the Oracle driver to /etc/share/java/ and I am not getting the "driver error" now :)

One more question. In my case the data is built as an object with sub-objects. Does the JDBC connector create the sub-tables, flat the structure or simply does not support the object hierarchy? 
And what can I do in that last case?

Robin Moffatt

unread,
Sep 7, 2017, 8:10:31 AM9/7/17
to confluent...@googlegroups.com
Do you mean you have a set of normalised tables in Oracle, and you want to denormalise them for consumption in Kafka? Or similarly, parent-child/recursive relationships that you want to resolve out? 
If not, can you explain more about what you mean by objects/sub-objects.

thanks, Robin. 

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

gvdm

unread,
Sep 7, 2017, 8:52:38 AM9/7/17
to Confluent Platform
I'm using the Kafka JDBC Sinc Connect and I'm trying to write some structured data from Kafka to an Oracle DB.
The data in the Kafka topic has a structure like this

{
 
"subObj1": {
   
"subField11": "value11",
   
"subField12": 0
 
},
 
"subObj2": {
   
"subField21": "value21",
   
"subField22": true
 
}
}

If I try to write the data using the JDBC Sinc Connect (Oracle DB) I get this error in console

[2017-09-07 14:09:14,523] ERROR Task kafka-connect-jdbc-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org
.apache.kafka.connect.errors.ConnectException: MyStructure.subObj2 (STRUCT) type doesn't have a mapping to the SQL database column type
        at io.confluent.connect.jdbc.sink.dialect.DbDialect.getSqlType(DbDialect.java:202)
        at io.confluent.connect.jdbc.sink.dialect.OracleDialect.getSqlType(OracleDialect.java:73)
        at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnSpec(DbDialect.java:140)
        at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:132)
        at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:128)
        at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:54)
        at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:37)
        at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnsSpec(DbDialect.java:128)
        at io.confluent.connect.jdbc.sink.dialect.DbDialect.getCreateQuery(DbDialect.java:96)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:87)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:66)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

So it seems to me that the JDBC Sinc Connect cannot handle hierarchies of objects when creating the DB structure, right?

Thank you

Giulio
thanks, Robin. 

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/468fbef4-4ce7-43cd-bca4-513b48e7e512%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages