UTF-32 Parse error while consuming messages using KafkaJsonDeserializer

5,527 views
Skip to first unread message

Rohit Sardesai

unread,
May 17, 2016, 5:49:09 AM5/17/16
to Confluent Platform
Hello,

I am creating a KafkaProducer instance with KafkaJsonSerializer as the key and value derializer. On the KafkaConsumer side , I am using the KafkajsonDeserializer for both the key and value deserialization. I am able to produce messages to the topic. However , while consuming messages , I am getting a similar exception :

org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff)  at char #2, byte #11)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff)  at char #2, byte #11)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2673)
at io.confluent.kafka.serializers.KafkaJsonDeserializer.deserialize(KafkaJsonDeserializer.java:69)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:595)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:539)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:67)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:137)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:134)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:877)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
at io.confluent.kafkarest.ConsumerReadTask.doPartialRead(ConsumerReadTask.java:171)
at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:131)

I am not using Avro for sending the data. this is a json request. Any thoughts on what could be wrong ?

This is how I am creating the KafkaProducer 

KafkaJsonSerializer keySerializer = new KafkaJsonSerializer().configure(props, true);
keySerializer.configure(props,true);
KafkaJsonSerialzier valueSerializer = new KafkaJsonSerializer();
valueSerializer.configure(props, false);
KafkaProducer< K, V > producer = new KafkaProducer<K, V>(props, keySerializer valueSerializer);


and KafkaConsumer :
KafkaJsonDeserializer keyDeserializer = new KafkaJsonDeserializer();
keyDeserializer.configure(props, true);
KafkaJsonDeserializer valueDeserializer = new KafkaJsonDeserializer();
valueDeserializer.configure(props, false);
KafkaConsumer jsonConsumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer);

I get an exception when I do a poll() . The KafkaJsonDeserializer throws this exception.

Rohit Sardesai

unread,
May 17, 2016, 8:07:56 AM5/17/16
to Confluent Platform
I further digged into the source code of KafkaJsonSerializer's serialize() method , its ObjectMapper uses UTF-8 encoding to write the data as bytes.

public byte[] serialize(String topic, T data) {
if(data == null) {
return null;
} else {
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (Exception var4) {
throw new SerializationException("Error serializing JSON message", var4);
}
}
}

ObjectMapper.java

public byte[] writeValueAsBytes(Object value) throws JsonProcessingException {
ByteArrayBuilder bb = new ByteArrayBuilder(this._jsonFactory._getBufferRecycler());

try {
this._configAndWriteValue(this._jsonFactory.createGenerator(bb, JsonEncoding.UTF8), value);
} catch (JsonProcessingException var4) {
throw var4;
} catch (IOException var5) {
throw JsonMappingException.fromUnexpectedIOE(var5);
}

byte[] result = bb.toByteArray();
bb.release();
return result;
}


In KafkaJsonDeserializer's deserialize() method , it attempts to call ObjectMapper's readValue method where in the encoding is detected based on this method :

public JsonParser constructParser(int parserFeatures, ObjectCodec codec, BytesToNameCanonicalizer rootByteSymbols, CharsToNameCanonicalizer rootCharSymbols, int factoryFeatures) throws IOException {
JsonEncoding enc =
this.detectEncoding();
if(enc == JsonEncoding.UTF8 && Feature.CANONICALIZE_FIELD_NAMES.enabledIn(factoryFeatures)) {
BytesToNameCanonicalizer can = rootByteSymbols.makeChild(factoryFeatures);
return new UTF8StreamJsonParser(this._context, parserFeatures, this._in, codec, can, this._inputBuffer, this._inputPtr, this._inputEnd, this._bufferRecyclable);
}
else {
return new ReaderBasedJsonParser(this._context, parserFeatures, this.constructReader(), codec, rootCharSymbols.makeChild(factoryFeatures));
}
}

This method should return a UTF8StreamJsonParser. But it is returning a ReaderBasedJsonParser as it finds the encoding as UTF-32. I am not sure why ?

Josh Goodrich

unread,
Jun 1, 2016, 5:09:52 PM6/1/16
to Confluent Platform
Did you find a solution to this?  I am running into the same error.

Saravanan Tirugnanum

unread,
Jun 5, 2016, 11:44:52 PM6/5/16
to Confluent Platform
This could probably because the message in the topic is Avro serialized but you are trying to use JsonDeserializer.. Check this once.

Regards
Saravanan

Josh Goodrich

unread,
Jun 6, 2016, 2:01:08 PM6/6/16
to confluent...@googlegroups.com, vtsa...@gmail.com
Thanks, that was what I was thinking.  How do I check it's Avro - and if it is how do I get it back to the JSON?

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/t0o-H8k6cWE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/65fea2eb-11b1-438b-9696-b619d8772866%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ben Davison

unread,
Jun 6, 2016, 2:04:49 PM6/6/16
to confluent...@googlegroups.com, vtsa...@gmail.com
If you just want to check, try consuming it with the command line tools?
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAN1Gs1fHQK-m47ykPtfpLSizpgD_ZA7-S9zjzZkv1%2BiV-JrrHQ%40mail.gmail.com.

For more options, visit https://groups.google.com/d/optout.



This email, including attachments, is private and confidential. If you have received this email in error please notify the sender and delete it from your system. Emails are not secure and may contain viruses. No liability can be accepted for viruses that might be transferred by this email or any attachment. Any unauthorised copying of this message or unauthorised distribution and publication of the information contained herein are prohibited.

7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
Registered in
England and Wales. Registered No. 04843573.

Josh Goodrich

unread,
Jun 6, 2016, 2:13:33 PM6/6/16
to confluent...@googlegroups.com, vtsa...@gmail.com
When I use the consumer-console the messages look like normal JSON.

Reply all
Reply to author
Forward
0 new messages