debugging KafkaAvroSerializer messages (using Schema Registry) sent to a kafka topic

2,002 views
Skip to first unread message

CL

unread,
Oct 16, 2015, 6:05:29 PM10/16/15
to Confluent Platform
Hi,
After a few days, it looks like we are now able to produce our own "budget" events to kafka using KafkaAvroSerializer and with Schema Registry running.
The BudgetEvent class extends SpecificRecordBase and implements SpecificRecord. There are a 2 Long and 1 String fields.

Two questions:
1) Using Confluent Platform 1.0.1, I can run at command line
$ ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic budget

I can see the budget events being output to the console by our code, but they are avro-encoded lines, so can't be sure they are as expected.
Is there some kind of shell script I can run at command line that can show/consume avro-encoded events.

2) We're having trouble writing a consumer code for the avro events.
The sample code in github.com/confluentinc repo:
io.confluent.examples.consumer.ConsumerLogic class

has this code snippet
  public void run() {
    ConsumerIterator<Object, Object> it = stream.iterator();

    while (it.hasNext()) {
      MessageAndMetadata<Object, Object> record = it.next();

      String topic = record.topic();
      int partition = record.partition();
      long offset = record.offset();
      Object key = record.key();
      GenericRecord message = (GenericRecord) record.message();
      System.out.println("Thread " + threadNumber +
                         " received: " + "Topic " + topic +
                         " Partition " + partition +
                         " Offset " + offset +
                         " Key " + key +
                         " Message " + message.toString());
    }
    System.out.println("Shutting down Thread: " + threadNumber);
  }

but our event is not GenericRecord, and I can see in the debugger

record = {MessageAndMetadata@2715} "MessageAndMetadata(budget,0,Message(magic = 0, attributes = 0, crc = 1670620843, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=34], payload = java.nio.HeapByteBuffer[pos=0 lim=29 cap=29]),0,io.confluent.kafka.serializers.KafkaAvroDecoder@51a6e050,io.confluent.kafka.serializers.KafkaAvroDecoder@51a6e050)"
 topic = {String@2722} "budget"
 partition = 0
 kafka$message$MessageAndMetadata$$rawMessage = {Message@2733} "Message(magic = 0, attributes = 0, crc = 1670620843, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=34], payload = java.nio.HeapByteBuffer[pos=0 lim=29 cap=29])"
 offset = 0
 keyDecoder = {KafkaAvroDecoder@2734} 
 valueDecoder = {KafkaAvroDecoder@2734} 

So
record.message()
does not work.

Is there a code snippet you can point us to for consuming events that are not GenericRecord but SpecificRecordBase type?

Thanks.

Ewen Cheslack-Postava

unread,
Oct 16, 2015, 7:36:16 PM10/16/15
to Confluent Platform
Hi CL,

On Fri, Oct 16, 2015 at 3:05 PM, CL <ck...@millennialmedia.com> wrote:
Hi,
After a few days, it looks like we are now able to produce our own "budget" events to kafka using KafkaAvroSerializer and with Schema Registry running.
The BudgetEvent class extends SpecificRecordBase and implements SpecificRecord. There are a 2 Long and 1 String fields.

Two questions:
1) Using Confluent Platform 1.0.1, I can run at command line
$ ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic budget

Confluent Platform ships with kafka-avro-console-producer and kafka-avro-console-consumer scripts. These are just wrappers of the kafka-console-prodcuer and kafka-console-consumer scripts that configure the serializers/formatters to handle Avro data. You can use them just as you have above.
 The CP 1.0.1 version only decodes data to GenericRecord. The reason for this is that GenericRecord allows you to dynamically use any schema, which means the deserializer can *always* load the schema from the schema registry and then deserialize the data. This is actually very useful since it means you're not required to have the SpecificRecord class on your classpath, which is useful for general purpose tools that need to be able to decode the data.

So you are getting your data, it's just in the form of a GenericRecord rather than a SpecificRecord.

Support for deserializing to a SpecificRecord has been implement, you can see the setting here: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java#L27 However, this is not yet in a released version. It will be included in the next release.

-Ewen


Thanks.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/cc94fb33-12e2-4345-8905-79639b736301%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

CL

unread,
Oct 19, 2015, 3:14:24 PM10/19/15
to Confluent Platform
Hi Ewen,

Thanks for reply (great that I can come into office this morning and see your suggestion).

1) Yeah! Following your suggestion, I'm able to see a formatted representation of our "BudgetEvent" SpecificRecord type message in my local kafka topic by running the kafka avro consumer script provided with confluent 1.0.1
~/installs/confluent/confluent-1.0.1
$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic <our-budget-topic>
...
{"timeStamp":{"long":1445264922460},"campaignId":{"long":2},"cost":{"string":"-3"}}
...


2) So for the second part of what I was trying to do, I see that the kafka-avro-console-consumer script above seems to just call the ConsoleConsumer class, and thought I could slightly hack the example consumer project currently in
to try to mimic what is done by the script and send to the console output a representation of our "BudgetEvent", by using the AvroMessageFormatter class.

So, in io.confluent.examples.consumer.ConsumerLogic class (which is called by io.confluent.examples.consumer.ConsumerGroupExample class), the meat is in the run() method, and I did following hack shown below.
The local console output I get seems to show that everything runs until the statement
avroMessageFormatter.writeTo(record.key(), record.message(), System.out);
when everything just hangs; even the debugger frame in IntelliJ goes blank :(

Any pointers?

Of course, trying to display a representation of the SpecificRecord in our consumer code does not really do all that much for us beyond giving us some confidence that we can eventually figure out how to do end-to-end serialization/deserialization using the Schema Registry, so an even better outcome would be if you could copy/paste any snippet of code that could give us some pointer to replace what is in the run() method below to get back our "BudgetEvent" SpecificRecord POJO, but I can understand that there might be too much code that you'd need to show us at this time.... In the next release/version, would the ConsumerGroupExample class be updated to show how to produce and consume a SpecificRecord type message to/from Kafka? Thanks. Sorry about the long reply.

package io.confluent.examples.consumer;
public class ConsumerLogic implements Runnable {
...
  public void run() {
      // hack to try to read and display SpecificRecord
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      System.out.println("Thread " + threadNumber);

      while (it.hasNext()) {
          MessageAndMetadata<byte[], byte[]> record = it.next();

          String topic = record.topic();
          long offset = record.offset();
          System.out.println(" received: " + "Topic " + topic +
                  " Offset " + offset + " -- ");
          AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
          Properties props = new Properties();
          props.put("schema.registry.url", "http://localhost:8081");
          avroMessageFormatter.init(props);

          avroMessageFormatter.writeTo(record.key(), record.message(), System.out);
      }
//    ConsumerIterator<Object, Object> it = stream.iterator();
//
//    while (it.hasNext()) {
//      MessageAndMetadata<Object, Object> record = it.next();
//
//      String topic = record.topic();
//      int partition = record.partition();
//      long offset = record.offset();
//      Object key = record.key();
//      GenericRecord message = (GenericRecord) record.message();
//      System.out.println("Thread " + threadNumber +
//                         " received: " + "Topic " + topic +
//                         " Partition " + partition +
//                         " Offset " + offset +
//                         " Key " + key +
//                         " Message " + message.toString());
//    }
    System.out.println("Shutting down Thread: " + threadNumber);
  }
...

CL

unread,
Oct 19, 2015, 5:00:45 PM10/19/15
to Confluent Platform
Hi Ewen,

(I don't seem able to continue this thread from my reply to you earlier this afternoon so just replying to my original post...)

I now can see the exception below in my debugger after more carefully stepping into this line of code
avroMessageFormatter.writeTo(record.key(), record.message(), System.out);
from the code snippet that I showed you earlier this afternoon.

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

The exception was thrown from the AbstractKafkaAvroDeserializer class, on the  first line in the try clause
...
    protected Object deserialize(byte[] payload) throws SerializationException {
        byte id = -1;
        if(payload == null) {
            return null;
        } else {
            try {
                ByteBuffer e = this.getByteBuffer(payload);
                ...
The payload is the byte[] array from
 record.key()
and in the debugger I can see the value of the first (and only) byte is 50.

Since the kafka-avro-console-producer script seems to display the "BudgetEvent" messages in my local kafka topic ok, there might be something that I'm not doing correctly in the code snippet that I showed in my reply earlier this afternoon to try to mimic what the kafka-avro-console-producer script does when it calls the ConsoleConsumer class, but it's not clear to me what... :(

Hoping that the exception that I see in the debugger can throw some light...

Thanks.

Ewen Cheslack-Postava

unread,
Oct 19, 2015, 7:01:36 PM10/19/15
to Confluent Platform
CL,

It's hard to say what might be wrong if you've modified any of the rest of the example.

If you have code like this:

    // Create decoders for key and value
    KafkaAvroDecoder avroDecoder = new KafkaAvroDecoder(vProps);

    Map<String, List<KafkaStream<Object, Object>>> consumerMap =
        consumer.createMessageStreams(topicCountMap, avroDecoder, avroDecoder);
    List<KafkaStream<Object, Object>> streams = consumerMap.get(topic);

In your project, then you shouldn't need to change anything compared to the example code in ConsumerLogic. By the time you get to the MessageAndMetadata, the KafkaAvroDecoder will have already done the work of deserializing to a GenericRecord. You shouldn't need a formatter either -- just cast to the GenericRecord and you can print any data contained in it.

The reason you get a KafkaStream with Object types is because Avro supports primitive types (such as Integer, String, etc) directly, which means the only common supertype for all of them is Object.

What are you using for keys? Is it possible you produced raw integer data into the key, but are trying to use an Avro decoder for that as well? Is the payload you're seeing definitely a byte[] with exactly one byte? Is it possible an earlier version of your code produced invalid data that now, trying to consume the data and using auto.offset.reset = smallest fails?

-Ewen


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

CL

unread,
Oct 20, 2015, 10:07:56 AM10/20/15
to Confluent Platform
Hi Ewen,

Got it!!
(On the train last night, I actually tried changing the "auto.offset.reset" property in the io.confluent.examples.consumer.ConsumerGroupExamples class to "largest" instead of "smallest", thinking along the same lines that the first of our BudgetEvent records in the kafka 'budget' topic might be screwy because we were still debugging our internal producer code, but the consumer was still hanging last evening...)
This morning after reading your pointer that "By the time you get to the MessageAndMetadata, the KafkaAvroDecoder will have already done the work of deserializing to a GenericRecord", I reverted the code in io.confluent.examples.consumer.ConsumerLogic back to what it was, and it is working as expected after I feed into our kafka 'budget' topic new BudgetEvent messages from our internal producer code.

So, to recap for others who might be following this thread, the issue was that there were bad messages in my local kafka 'budget' topic, and changing the "auto.offset.reset" property to "largest" avoids re-reading the earliest bad messages in the kafka topic store. 

Awesome pointers, Ewen. Thanks!!
-CL
(P.S. If you don't mind, I'll close this thread, and post a new thread -- to ask if you have any code snippet or pointers that you're able to share for us to write consumer code to decode/deserilize into a SpecificRecord type instead of GenericRecord type.)


On Friday, October 16, 2015 at 6:05:29 PM UTC-4, CL wrote:

Michael Noll

unread,
May 6, 2016, 5:31:27 AM5/6/16
to confluent...@googlegroups.com
CL,

take a look at my message at https://groups.google.com/d/msg/confluent-platform/JLZWG1Ybpjc/YJiUW-p1BQAJ from a related Avro discussion.

The key line is the following, which ensures that the Avro deserializer is returning proper pojos instead of Avro's GenericRecord:

    consumerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

Note that this code is in a feature branch that uses the updated Kafka Streams implementation of Kafka's upcoming 0.10.0.0 release, i.e. the code examples in the feature branch are not compatible with the Tech Preview anymore (= what you have been using so far).

I hope you'll be able to extrapolate from the code pointers above to your specific use case.

Best,
Michael




--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

Reply all
Reply to author
Forward
0 new messages