I am creating a KafkaProducer instance with KafkaJsonSerializer as the key and value derializer. On the KafkaConsumer side , I am using the KafkajsonDeserializer for both the key and value deserialization. I am able to produce messages to the topic. However , while consuming messages , I am getting a similar exception :
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff) at char #2, byte #11)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x2e899230(above 10ffff) at char #2, byte #11)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2673)
at io.confluent.kafka.serializers.KafkaJsonDeserializer.deserialize(KafkaJsonDeserializer.java:69)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:595)
at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:539)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:67)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:137)
at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:134)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:877)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
at io.confluent.kafkarest.ConsumerReadTask.doPartialRead(ConsumerReadTask.java:171)
at io.confluent.kafkarest.ConsumerWorker.run(ConsumerWorker.java:131)
I am not using Avro for sending the data. this is a json request. Any thoughts on what could be wrong ?
KafkaJsonSerialzier valueSerializer = new KafkaJsonSerializer();
valueSerializer.configure(props, false);
KafkaProducer< K, V > producer = new KafkaProducer<K, V>(props, keySerializer , valueSerializer);