Handling of DataException thrown by AvroConverter toConnectData()

381 views
Skip to first unread message

Lucas Ariel Martinez

unread,
Sep 14, 2016, 7:01:41 AM9/14/16
to Confluent Platform

Hi!

I am using a SinkTask to consume Avro serialized messages from Kafka.

The issue I have is that if I insert a message into the topic which is not Avro encoded, the Task dies.

i.e.:


ERROR Task justone-kafka-sink-pg-json-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)

org
.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
    at io
.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
    at org
.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
    at org
.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
    at org
.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
    at org
.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
    at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java
.lang.Thread.run(Thread.java:745)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!


Presumably, this bad message will be given to another Task (or the same task after manually restarted) but it will never be handled and it will always kill the tasks.


Why it was assumed that no bad messages would be written into the topic?

Is there any way to simple discard these bad messages or route them somewhere else? What should be done in this case?

 

Thanks!!!


Jeetu Bethina

unread,
Nov 17, 2016, 5:49:19 PM11/17/16
to Confluent Platform
Hi,

I am seeing a similar issue where a Task is killed due to a bad message in the topic. 

Is there any config value that would let us discard(and log) bad messages, but continue to process messages on the topic.

Thank you.

Ewen Cheslack-Postava

unread,
Nov 21, 2016, 1:45:51 PM11/21/16
to Confluent Platform
I wouldn't say this is an assumption we're making, but agreed that it is not something the framework is providing support for handling well. Generally there will probably only be a couple of useful policies (discard, send to dead letter queue, or fail), so this would be great to have available at the framework level.

Note that this becomes much less of an issue if you validate compatibility of data before ever producing it to Kafka (e.g. with the schema registry and Avro serializers).

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/18bf53a0-4f21-47cf-9516-0ef98c575e72%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages