UTF8 / JSON parsing issue in the REST proxy

1,631 views
Skip to first unread message

Guillaume

unread,
Feb 4, 2016, 9:41:04 AM2/4/16
to Confluent Platform
Hi,

I do have a weird error where using a connector gives me the error:
{"error_code":50002,"message":"Kafka error: java.io.CharConversionException: Invalid UTF-32 character 0x15020631(above 10ffff)  at char #1, byte #7)"}

Now I am not sure what happens so here are the steps I am following. Note that I test this under vagrant + puppet, so all changes are tested under a brand new untainted box.
My full test script can be found in a gist at https://gist.github.com/lomignet/2741a54d07ada45afa0a I give only an outline here, without all the belts, braces and whistles.

Some informations:
- server is centos 7
- client is debian

1) uploading schema:
curl
   
-X POST
   
-H "Content-Type: application/vnd.schemaregistry.v1+json"
   
--data @$AVSC.cf
    http
://$SERVER:8081/subjects/event-value/versions


Where $AVSC.cf is a standard avro schema, surrounded by {"schema": "..."} and with the " inside escaped. Output is 

{"id":21}

2) sending data
curl -X POST
 
-H "Content-Type: application/vnd.kafka.avro.v1+json"
 
--data "$data"
  http
://$SERVER:8082/topics/$TOPIC


Where $data is a valid json data file, surrounded by "{\"value_schema_id\": $SCHEMAID, \"records\": [{\"value\": ${data}}]}". Ouput is:

{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}

I do see the offset increasing if I send more events so it's all looks fine.

3) creating a consumer
curl
 
-X POST
 
-H "Content-Type: application/vnd.kafka.v1+json"
 
--data "{\"name\": \"${CONSUMER}_instance\", \"format\": \"json\", \"auto.offset.reset\": \"smallest\"}"
  http
://$SERVER:8082/consumers/$CONSUMER


Nothing specific here, output is:


4) using the customer:
 
curl
 
-X GET
 
-H "Accept: application/vnd.kafka.json.v1+json"
  http
://$SERVER:8082/consumers/$CONSUMER/instances/${CONSUMER}_instance/topics/$TOPIC

This is were things go wrong. I get as output:
{"error_code":50002,"message":"Kafka error: java.io.CharConversionException: Invalid UTF-32 character 0x15020631(above 10ffff)  at char #1, byte #7)"}

If I get a look on the server, the full message is:

Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: [2016-02-04 09:27:50,510] INFO 172.28.128.1 - - [04/Feb/2016:09:27:49 +0000] "
GET /consumers/vreten/instances/vreten_instance/topics/events HTTP/1.1" 500 150  728 (io.confluent.rest-utils.requests:77)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: [2016-02-04 09:27:50,511] ERROR Unexpected exception in consumer read thread:
 (io.confluent.kafkarest.ConsumerReadTask:153)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionE
xception: Invalid UTF-32 character 0x15020631(above 10ffff)  at char #1, byte #7)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x15020631(above 10ffff)  at char #1, byte #7)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2673)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at io.confluent.kafkarest.JsonConsumerState.deserialize(JsonConsumerState.java:76)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at io.confluent.kafkarest.JsonConsumerState.createConsumerRecord(JsonConsumerState.java:66)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at io.confluent.kafkarest.ConsumerReadTask.doPartialRead(ConsumerReadTask.java:118)
Feb 04 09:27:50 confluent.wp.local kafka-rest-start[6927]: at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:90)


I did try different variations of LC_ALL and LANG (namely the 4 combinations of C and en_US.UTF-8) on the confluent server and my client, it did no make any difference.

The schema and data file I use are completely fine when used with the avro tools:

java -jar ./avro-tools-1.7.7.jar fromjson --schema-file $AVSC $DATA > $DATA.avro

java
-jar ./avro-tools-1.7.7.jar tojson  $DATA.avro

Outputs the data back properly, no matter what LC_ALL is set to.

According to me the data I send is correct, pure ASCII so I do not understand what is going on. I could not find any help during my searches either.

Any hint would be appreciated!

Guillaume


Ewen Cheslack-Postava

unread,
Feb 5, 2016, 2:58:38 AM2/5/16
to Confluent Platform
Guillaume,

That definitely looks odd. Not sure why it's trying to decode as UTF-32, and it looks like you've already tried the standard LC_ALL and LANG settings to control this. Jackson should pick up encoding settings when deserializing automatically, e.g., see http://stackoverflow.com/questions/10004241/jackson-objectmapper-with-utf-8-encoding so I suspect the original *encoding* might be off. There seem to be some other system properties, like file.encoding, that could affect this -- maybe those would be worth checking?

You also provided great detail on how to reproduce and mentioned it fails under your Vagrant setup, but which Vagrant box image are you using? You mention "server is centos 7" and "client is debian", but could you be more specific about the details to reproduce?

Thanks,
-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/db69f1a7-de2c-430a-bb99-95b67a635a9e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Guillaume

unread,
Feb 5, 2016, 4:58:04 AM2/5/16
to Confluent Platform
Hi Ewen,

Thanks foor looking into this.

The client is an unholy mix of debian mint + some jessie repos + some ubuntu repos, but as my test script as shown in the gist is pure bash I am not sure it is very relevant anyway. To remove some potential issues, I tried to actually run my script from the confluent server as well, and I do have the same issues.

The server is an official  base centos 7 image from hashicorp: https://atlas.hashicorp.com/centos/boxes/7

To make it easily reproducible, I added a few files to the gist:
- a Vagrantfile
- the systemd start/stop scripts we use
- a helper script telling systemd if a service is up or not based on open port

The Vagrantfile will:
- load the default centos7 box from hashicorp
- set up systemd start/stop scripts by getting them from gist
- updates the kafka config to remove the broker.id line (to only have auto-generated ids)
- get the few files I use for test from gist as well

This should give an easy to reproduce option to see what actually happens:

# prerequisite: have vagrant installed
mkdir utf32
cd utf32
vagrant up --provider virtualbox
# wait a minute or 3 to have all automagically downloaded and installed
vagrant ssh

# then run in the vagrant machine:
# ./confluent.sh


You can see what is going wrong. As the services are started via systemd, you can get the logs (as root) with journalctl, eg.:
journalctl -u kafka-rest

Cheers,

Guillaume

unread,
Feb 5, 2016, 8:17:20 AM2/5/16
to Confluent Platform
I updated a bit the test script, nothing major, just to make it a tad easier to use. You can get the last version by getting a new vagrantfile instead of the one in my previous email:

Guillaume

unread,
Feb 10, 2016, 4:18:59 AM2/10/16
to Confluent Platform
The sad root cause behing this error was that if you send avro data and read it bask with a JSON reader, for some reason the rest proxy is not happy.

Lesson learnt!

Rohit Sardesai

unread,
May 17, 2016, 5:29:07 AM5/17/16
to Confluent Platform
Hello,

I am creating a KafkaProducer instance with KafkaJsonSerializer as the key and value derializer. On the KafkaConsumer side , I am using the KafkajsonDeserializer for both the key and value deserialization. I am able to produce messages to the topic. However , while consuming messages , I am getting a similar exception :

[2016-05-17 17:25:34,180/GMT+08:00][ConsumerWorker-0][ERROR] Uncaught error in request completion: (org.apache.kafka.clients.NetworkClient:290)
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff)  at char #2, byte #11)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff)  at char #2, byte #11)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2673)
at io.confluent.kafka.serializers.KafkaJsonDeserializer.deserialize(KafkaJsonDeserializer.java:69)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:595)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:539)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:67)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:137)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:134)

I am not using Avro for sending the data. this is a json request. Any thoughts on what could be wrong ?
Reply all
Reply to author
Forward
0 new messages