Hi,
New to kafka and confluent platform.
Actually, I am using confluent 2.0.0 and kafka-client 0.9.0.1.
I create a consumer to read avro message from kafka.
I have also provided the schema registry url to AvroDeserializer for deserialization.
One test case is to catch some error/exception during message deserialization if the format of the message pushed to the correspondant topic is not correct.
I managed to send a malformed message "abc" to my topic by using console-producer which does not check the schema.
When I started to consumer messages, it seems that there is no exception is thrown, but only some error logs show up again and again during the poll duration.
|
| for (ClientResponse response : responses) { |
| if (response.request().hasCallback()) { |
| try { |
| response.request().callback().onComplete(response); |
| } catch (Exception e) { |
| log.error("Uncaught error in request completion:", e); |
| } |
| } |
| } |
What I observed is that java.nio.BufferUnderflowException occurs in `onComplete` call, then error log is shown periodically.
I am trying to understand this code snippet, since it seems that even if there is an exception, it will just be logged as error in order not to stop the consumer.
But why not throwing the exception and let user to process it. In my case, I want to catch the BufferUnderflowException during deserialization for testing.
Actually, it seems not possible.
Any help is highly appreciated.
Thank you
Hao