Confluent Kafka JDBC Sink Connector - How to define insert query

1,078 views
Skip to first unread message

Sassh

unread,
Dec 23, 2018, 6:51:25 AM12/23/18
to Confluent Platform
I need to configure a JDBC Sink Connector to read data from topic and write into SQL DB, i have set auto.create = false as i have table already defined in SQL

Connector:

{
"name": "test-sink",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:sqlserver://10.12.13.14:1443/database=mydatabase",
"connection.user": "myusername",
"connection.password": "mypwd",
auto.create=false",
insert.mode="insert"
}

I need to understand how would i pass the insert query in this? As how  Connector would push the data in columns of SQL table, i have JSON data in my topicJSON of topic looks like: 
{
"OrderId": "1",
"OrderNo": "ef209",
OrderDetails: {   
"OrderQuantity":"2",  
 "OrderRecipient":"test",  
 "OrderDelDate":"22-10-2019"
 }
}
Please can you suggest as how would i i pass something like Insert into mytable Select OrderId as ordered, OrderNo as orderno, OrderDetails.OrderQuantity as orderquantity, OrderDetails.OrderRecipient as orderrecipient from orders;

Robin Moffatt

unread,
Jan 3, 2019, 7:21:41 AM1/3/19
to confluent...@googlegroups.com
AFAIK the JDBC Sink connector doesn't support nested structures in your data. 

What I would do here is use KSQL to flatten your message structure to a new topic, and then stream that topic to the DB using the JDBC Sink connector. 

You can see an example of working with nested structures (STRUCT) in KSQL here https://www.confluent.io/stream-processing-cookbook/ksql-recipes/nested-json-data and here https://www.confluent.io/blog/data-wrangling-apache-kafka-ksql


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b2eeeca3-457c-4256-b35c-7821c108317f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sassh

unread,
Jan 8, 2019, 7:34:00 AM1/8/19
to Confluent Platform
Thanks for the response Robin.

I understood that JDBC Sink Connectors doesn't support nested structure. I'll covert it into flat JSON structure at 1 level.

Once I have the structure how would I map my data to SQL as I don't have schema coming in my messages and it's only payload information. Also, we are not using any AVRO Converters.

Please suggest.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Robin Moffatt

unread,
Jan 8, 2019, 11:21:15 AM1/8/19
to confluent...@googlegroups.com
You could use KSQL to apply the schema, as well as perform any filtering as required https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Sassh

unread,
Jan 8, 2019, 2:29:01 PM1/8/19
to Confluent Platform
No, we are not using AVRO schemas and converters.Is that possible via SMT or Stream using Json format only? i.e. to apply smt to append the schema or using kafka stream, before pushing to sql server?
Reply all
Reply to author
Forward
0 new messages