Exception Handling in Kafka Streams - Write to different topic

3,772 views
Skip to first unread message

Lavanya Manickam

unread,
Aug 29, 2018, 6:06:05 AM8/29/18
to Confluent Platform
How do I achieve the following using Kafka streams?

For every message in my stream, I have a processing logic inside try/catch block.

Whenever any processing exception / serialization exception occurs, I want to catch it and publish the particular message to a different topic ( say, failed_messages_topic) and continue streaming.

I do not have any condition on which I can use  kafkaStream#branch. Any message that gives me any exception need to go to the failed topic. How do I do this?  I am using 1.0.1 version. 

Thanks,
Lavanya M

Sriram KS

unread,
Aug 29, 2018, 12:24:48 PM8/29/18
to confluent...@googlegroups.com
You can try the following two handlers for serialization and deserialization exceptions
/**
* {@code default.deserialization.exception.handler}
*/
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";

/**
* {@code default.production.exception.handler}
*/
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";

For a stream exception you can use
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public uncaughtException(Thread t, throwable e) {
// here you should examine the exception and perform an appropriate action!
}
);

Hope this helps

Regards
Sriram



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/47cf40c7-7e2e-4a43-a21e-209fb88afc31%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Lavanya Manickam

unread,
Aug 30, 2018, 2:42:30 AM8/30/18
to confluent...@googlegroups.com
Thanks for the reply Sriram. Please correct me if I'm wrong as I am new to Kafka Streams. setUncaughtExceptionHandler() is called when the stream is terminated by an exception. That is not the case I am trying to handle. It can be any processing logic exception and it is not necessary for the stream to be terminated. I want to post the particular message that caused the error to a different topic and continue with streaming the next messages. I am not aware if this is possible with Kafka Streams.

Thanks,
Lavanya M

Sriram KS

unread,
Aug 30, 2018, 11:42:35 AM8/30/18
to confluent...@googlegroups.com
If your case is to continue processing and log your message to a separate topic i would advice to use 
I personally dont like the name deserialization in the naming of the handler, but this serves as a handler for all errors happening within a stream


The above documentation clearly points out that you can use the handler for 
corrupt data, incorrect serialization logic, or unhandled record types

Hope this helps

Regards
Sriram

Sriram KS

unread,
Aug 31, 2018, 1:59:09 PM8/31/18
to confluent...@googlegroups.com
My bad, i tried this but its not capturing all Exceptions.

I see in StreamThread.java all uncaughtException are captured and finally is called to shutdown

I agree, that global exception handler is not available for a running stream.

While thinking through found some options

1. Catching exceptions in each flow and handling them locally
2. catching exceptions by wrapping the methods with an aspect and handling them globally

Regards
Sriram

Matthias J. Sax

unread,
Sep 3, 2018, 2:53:32 PM9/3/18
to confluent...@googlegroups.com

> but this serves as a handler for all errors happening
>> within a stream

This is not correct. The `default.deserialization.exception.handler` is
only used when an exception occurs during deserialization (as the name
indicates).

To handled deserialization or serialization errors, you might want to
provide a custom handler. Cf.
https://docs.confluent.io/current/streams/faq.html#option-3-quarantine-corrupted-records-dead-letter-queue

Using try-catch in your processing logic does make sense to handle
exception and avoid dying. Using the DSL, you will need some flag to use
branch() though. It should be possible to wrap the actual data with a
simple POJO type with two fields: (1) holding the actual data and (2)
indicating success/error during processing. After branching you can
remove the wrapper and extract the actual data again.

For the uncaught exception handler: it is called after the thread died;
ie, it's to late to continue processing, as the handler is called after
the fact.

Hope this helps.


-Matthias



On 8/31/18 10:58 AM, Sriram KS wrote:
> My bad, i tried this but its not capturing all Exceptions.
>
> I see in StreamThread.java all uncaughtException are captured and
> finally is called to shutdown
>
> I agree, that global exception handler is not available for a running
> stream.
>
> While thinking through found some options
>
> 1. Catching exceptions in each flow and handling them locally
> 2. catching exceptions by wrapping the methods with an aspect and
> handling them globally
>
> Regards
> Sriram
>
> On Thu, Aug 30, 2018 at 10:42 AM Sriram KS <srira...@gmail.com
> <mailto:srira...@gmail.com>> wrote:
>
> If your case is to continue processing and log your message to a
> separate topic i would advice to use 
>
>
> default.deserialization.exception.handler
> <https://kafka.apache.org/20/documentation/streams/developer-guide/config-streams#id7>
>
> I personally dont like the name deserialization in the naming of the
> handler, but this serves as a handler for all errors happening
> within a stream
>
> https://kafka.apache.org/20/documentation/streams/developer-guide/config-streams#default-deserialization-exception-handler
>
> The above documentation clearly points out that you can use the
> handler for 
> /*_corrupt data, incorrect serialization logic, or unhandled record
> types_*/
> /*_
> _*/
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/47cf40c7-7e2e-4a43-a21e-209fb88afc31%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails
> from it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAGRGRQ4RfsUgD1Ti9X%3DeAJDLL-vx7rPDHhLhCWbk6-CTMTZDUw%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAGRGRQ4RfsUgD1Ti9X%3DeAJDLL-vx7rPDHhLhCWbk6-CTMTZDUw%40mail.gmail.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from
> it, send an email to
> confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAOJy%2B2L1PHY5gutYo3WdvCGDGantqbTdqV2Ukxy71ha%2BCpDnig%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAOJy%2B2L1PHY5gutYo3WdvCGDGantqbTdqV2Ukxy71ha%2BCpDnig%40mail.gmail.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAGRGRQ6cNw4VxqPU%3DeOdTKo7P7eLUEiLPKdXb8w-Vz3GLWeGmA%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAGRGRQ6cNw4VxqPU%3DeOdTKo7P7eLUEiLPKdXb8w-Vz3GLWeGmA%40mail.gmail.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages