Thanks for confluent Kafka - a good way to associate a schema to a topic.
I have a storm topology that uses a kafka spout to read messages from a topic. The messages to the topic is sent through confluent kafka rest API with a schema. The format used when posting message is "curl -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" \ "
Storm bolt that tries to deserialize the received message is encountering errors
1. First I tried using the defaultScheme in Kafkaspout. The data is received in the bolt using tuple.getBinary(0). When trying to deserialize the message using Avro binary decoder, I am getting length is negative -1 error
String ptSchemaString = null;
try {
ptSchemaString = Resources.toString( Resources.getResource( "Example.avsc" ), Charsets.UTF_8 );
} catch (IOException e) {
e.printStackTrace();
}
ByteArrayInputStream input = new ByteArrayInputStream( tuple.getBinary(0) );
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder( input, null );
try {
return reader.read( null, decoder );
} catch( IOException ex ) {
System.out.println("Problem deserializing data : " + ex.getStackTrace());
}
}
Exception :
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[stormjar.jar:?]
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[stormjar.jar:?]
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272) ~[stormjar.jar:?]
at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:113) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:353) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[stormjar.jar:?]
2. I tried using StringScheme to read data as Json
Example p = null;
Schema schema = Example.getClassSchema();
Decoder decoder = null;
try {
decoder = DecoderFactory.get().jsonDecoder(schema,IOUtils.toInputStream(data,"UTF-16LE"));
p = reader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
}
java.io.CharConversionException: Invalid UTF-32 character 0x7b226174(above 10ffff) at char #362, byte #1451)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:155) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:109) ~[com.fasterxml.jackson.core.jackson-core-2.3.2.jar:2.3.2]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser