Using KafkaSpout to read messages from confluent topic

515 views
Skip to first unread message

Sri Shanmugam

unread,
Apr 5, 2016, 10:58:33 AM4/5/16
to Confluent Platform
Thanks for confluent Kafka - a good way to associate a schema to a topic.
I have a storm topology that uses a kafka spout to read messages from a topic. The messages to the topic is sent through confluent kafka rest API with a schema. The format used when posting message is "curl -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" \ "

Storm bolt that tries to deserialize the received message is encountering errors 
1. First I tried using the defaultScheme in Kafkaspout. The data is received in the bolt using tuple.getBinary(0). When trying to deserialize the message using  Avro binary decoder, I am getting length is negative -1 error

Code: 
 
void deserialize(tuple) {

DatumReader<EntityType> reader = new SpecificDatumReader<>(EntityType.class);

String ptSchemaString = null;
try {
ptSchemaString = Resources.toString( Resources.getResource( "Example.avsc" ), Charsets.UTF_8 );
} catch (IOException e) {
e.printStackTrace();
}

ByteArrayInputStream input = new ByteArrayInputStream( tuple.getBinary(0) );
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder( input, null );
try {
return reader.read( null, decoder );
} catch( IOException ex ) {
System.out.println("Problem deserializing data : " + ex.getStackTrace());
}
}

Exception :

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[stormjar.jar:?]
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[stormjar.jar:?]
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272) ~[stormjar.jar:?]
        at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:113) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:353) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[stormjar.jar:?]
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[stormjar.jar:?]

2. I tried using StringScheme to read data as Json

       Example p = null;
        Schema schema = Example.getClassSchema();
        Decoder decoder = null;
        try {
            decoder = DecoderFactory.get().jsonDecoder(schema,IOUtils.toInputStream(data,"UTF-16LE"));
            p = reader.read(null, decoder);
        } catch (IOException e) {
            e.printStackTrace();
        }

Exception:

java.io.CharConversionException: Invalid UTF-32 character 0x7b226174(above 10ffff)  at         char #362, byte #1451)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:155) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:109) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser

Whats the best way to deserialize an embed avro formatted JSON string?

Appreciate any help,
Thanks
Sri

srivish...@gmail.com

unread,
Apr 14, 2016, 7:46:58 AM4/14/16
to Confluent Platform

I was able to figure out using the hint from the link - https://groups.google.com/forum/#!msg/confluent-platform/A7B6uSnJa5k/FwUATlNi8aQJ. Here is the equivalent Java code that worked for me.

    ByteBuffer input = ByteBuffer.wrap(data);
    int id = input.getInt();
    int start = input.position() + 1;
    MyAvroObject obj = null;
    try {
        obj  = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));

    } catch (IOException e) {
        e.printStackTrace();
    }
    return obj;

getInt() and position method on the ByteBuffer moves the pointer after the schema Id. Hope this helps someone.

Reply all
Reply to author
Forward
0 new messages