How to manually create a producer to insert to kafka with schema-payload structure?

792 views
Skip to first unread message

blasteralfred

unread,
Jun 21, 2017, 8:02:47 AM6/21/17
to Confluent Platform
Hi,

I used Kafka connect JDBC Source to insert data to my Kafka topic as JSON, and data in Kafka is like below (using console consumer) (formatted JSON);

{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "COLUMNA"
}, {
"type": "string",
"optional": true,
"field": "COLUMNB"
}{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "COLUMNC"
}
],
"optional": false,
"name": "TABLE_A"
},
"payload": {
"TXID": "TESTDATA1",
"SESSION_ID": "TESTDATA2",
"TXENTRYTIME": 1490594582705
}
}

I was able to use Kafka connect JDBC sink to insert the same data to another table (same schema as of source table), and it works like charm. I used my own Kafka producer java program to insert data into Kafka as JSON, but it look like below (formatted JSON);

{
"TXID": "TESTDATA1",
"SESSION_ID": "TESTDATA2",
"TXENTRYTIME": 1490594582705
}

and my JDBC sink connector is not working with "this topic" since it don't have "schema struct" and "payload". I would like to know how to manually create an entry like above (created by JDBC source) and insert it into Kafka topic. It will be really helpful for a beginner like me if you can provide some sample java code.

Thank you.

Ewen Cheslack-Postava

unread,
Jun 28, 2017, 2:12:37 AM6/28/17
to Confluent Platform
The format of the data from your JDBC source is using a simple envelope format that includes both the schema and the data. As you can see, the data you are producing with your own Kafka producer java program includes only the "payload" section of the data produced by the connector.

This is because in Connect you are using the JsonConverter with schemas.enabled=true. Kafka Connect allows providing schemas, in which case the format of the data is explicitly defined and enforced, or not providing a schema and allowing for flexibility. Providing schemas is almost universally a better option because it allows downstream consumers to be sure of the format of data, but omitting schemas is supported to ensure we can interact with as many systems as possible.

In the case of JsonConverter, it supports both. If you want schemas, then the schema is included inline along with the data. However, this means we need to wrap it in an "envelope", where we wrap the data in an additional format that adds the "schema" as a top-level field and wraps all the data in a top-level "payload" field". If you don't want this wrapper (or the schema information it provides), you should adjust the configuration for the converter in the worker config to disable schemas by setting:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ffa6c7ff-8e20-4208-ba69-2a0e5296d84e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages