How to provide the JSON schema to a Kafka producer?

37 views
Skip to first unread message

beni...@gmail.com

unread,
Jan 21, 2021, 1:51:18 PM1/21/21
to Confluent Platform
Hi all,

I am trying to use the feature of JSON schema provided by Schema Registry. In my use case, I need to use the equivalent to Avro GenericRecord in the JSON case, that is, JsonNode according to https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-json.html.

Because I am my json object is stored in a generic JsonNode structure, I am assuming that I need to provide the JSON schema (the one that my json object must follow) in some way to the producer so it can register the schema in the Schema Registry (similar to what is done in the Avro case) and do the corresponding validations and checkings.

I have not found the way to it. In the examples provided in the link above, the kafka-json-schema-console-producer is used. In that case, the schema is provided using a property "value.schema". I have tried to use it for my producer but it is not working:

2021-01-21 18:04:12,416] WARN The configuration 'value.schema' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)

In addition, another log message is being show that indicates that I have not done the right thing:
WARN Not able to generate jsonSchema-info for type: [simple type, class com.fasterxml.jackson.databind.node.ObjectNode] - probably using custom serializer which does not override acceptJsonFormatVisitor (com.kjetland.jackson.jsonSchema.JsonSchemaGenerator)

The properties of the producer are the following:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");

props.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(KafkaJsonSchemaSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
props.put(KafkaJsonSchemaSerializerConfig.FAIL_INVALID_SCHEMA, "true");

Please, can anybody help with this?

Thanks in advance.




Reply all
Reply to author
Forward
0 new messages