Confused by SchemaRegistry and how to use the KafkaAvroSerializer to generate a message

1,647 views
Skip to first unread message

Dominic DeCesare

unread,
Jun 1, 2017, 1:01:23 PM6/1/17
to Confluent Platform
Hi

I'm a couple of days into doing some proof of concept work with Kafka + Avro + Schema Registry, and I feel like there's a gap in my understanding so I'm going about things the wrong way.

What I'm trying to achieve is:

POJO as JSON -> Consumer -> AVRO -> Kafka Topic -> AVRO -> Producer -> POJO as JSON

where I'm building a really simple command line consumer/producer just to learn how it works.

The end goal would be some Java App that serializes a POJO into AVRO, sends it to a kafka topic which some other Java App consumes off of deserializes into POJO and then processes.

What I have so far:

A schema generated from the POJO using ReflectData and another using Jackson's AvroSchemaGenerator. The POJO has several fields which are other POJOs.
A working command line consumer/producer where I've compiled them/run them using a provided generated schema file.

My attempts to serialise JSON into AVRO and the ReflectData schema using the GenericDatumReader/Writers didn't work, because it seemed to require the input JSON to have all the inner object named with their type. i.e. {A : "Inner" {"B":"1", "C":2"} } rather than just {A : {"B":"1", "C":2"} }. Here's my attempt:

producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");

try ( InputStream schemaStream = classLoader.getResourceAsStream( schemaFile );
     
Producer<String, byte[]> producer = new KafkaProducer<>( producerProperties );
     
ByteArrayOutputStream output = new ByteArrayOutputStream();
) {
Schema requestSchema = new Schema.Parser().setValidate(true).parse( schemaStream );
    DatumReader<GenericRecord> reader = new GenericDatumReader<>( requestSchema );
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>( requestSchema );
    Console console = System.console();
   
while ( true && console != null ) {
   
String input = console.readLine();
   
if ( input != null ) {
       
ObjectMapper requestMapper = new ObjectMapper();
            RequestObject
request = requestMapper.readValue( input, RequestObject.class );
Decoder decoder = DecoderFactory.get().jsonDecoder( requestSchema, input );
Encoder encoder = EncoderFactory.get().binaryEncoder( output, null );

GenericRecord record = reader.read( null, decoder );
writer.write( record, encoder );
encoder.flush();
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>( producerProperties.getProperty( "topic" ), output.toByteArray() );
           
     
            producer
.send( producerRecord );
   
}
   
}
} catch ( IOException ex ) {
    throw new RuntimeException( "Problem generating AVRO message", ex );
}

In contrast using the Jackson generated schema and their AvroMapper class worked fine:
producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");


try ( InputStream schemaStream = classLoader.getResourceAsStream( schemaFile );
Producer<String, byte[]> producer = new KafkaProducer<>( producerProperties );
ByteArrayOutputStream output = new ByteArrayOutputStream();
) {
   Schema requestSchema = new Schema.Parser().setValidate(true).parse( schemaStream );
Console console = System.console();
while ( true && console != null ) {
String input = console.readLine();
if ( input != null ) {
ObjectMapper requestMapper = new ObjectMapper();
RequestObject request = requestMapper.readValue( input, RequestObject.class );

AvroMapper avroMapper = new AvroMapper();
AvroSchema avroSchema = new AvroSchema( requestSchema );
byte[] message = avroMapper.writer( avroSchema ).writeValueAsBytes( request );
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>( producerProperties.getProperty( "topic" ), message );
producer.send( producerRecord );
}
}
} catch ( IOException ex ) {
throw new RuntimeException( "Problem generating AVRO message", ex );
}


Weirdly when writing the consumer the GenericDatumReader worked perfectly. So I felt like I had a reasonable understanding of working with AVRO at that point.

My understanding of the point of schema registry is that I could send a whole bunch of POJOv1 into Kafka, then send in some more POJOv2 and successfully consume all of those, only having to register the updated schema. Which seemed like a sensible next step.

When I try to do write a producer to work with schema registery like so:
producerProperties.put( "key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer" );
producerProperties.put( "value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer" );

try ( Producer<String, RulesServiceRequest> producer = new KafkaProducer<>( producerProperties ); ) {
Console console = System.console();
while ( true && console != null ) {
String input = console.readLine();
if ( input != null ) {
ObjectMapper requestMapper = new ObjectMapper();
RequestObject request = requestMapper.readValue( input, RequestObject.class );

ProducerRecord<String, RequestObject> producerRecord = new ProducerRecord<>( producerProperties.getProperty( "topic" ), request.getTransaction().getId(), request );
producer.send( producerRecord );
}
}
} catch ( IOException ex ) {
throw new RuntimeException( "Problem generating AVRO message", ex );
}

I get the following error: "Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord" when I try and pass it the RequestObject JSON as an input

What I'd like to understand:
1) If I register a schema with the schema registry, how do I configure the Producer to use it? Or is it purely based on the POJO signature? Or is it based on the topic name and the appropriately named Key and Values? Or should I not generate and register the schema and have that be done by the KafkaAvroSerializer?
2) Have I just misunderstood what the Schema Registry is for?
3) Is there a way to pass in configuration properties such that I can start it saying use Schema X?
4) Why I'm getting the above exception?

CHANCHAL SINGH

unread,
Apr 18, 2018, 3:28:18 AM4/18/18
to Confluent Platform

I am also facing the same issue. Did you find out the solution ?
Reply all
Reply to author
Forward
0 new messages