Kafka Streams: how to handle exceptions [Java 8]

3,216 views
Skip to first unread message

Francesco Nobilia

unread,
Nov 28, 2016, 4:55:02 AM11/28/16
to Confluent Platform
Hi there,

I'm implementing a kafka streams applications with multiple streams based on Java 8. It works fine but it does some assumptions on data format. If at least one of this assumption is not verified, my streams will fail raising exceptions. I have in mind two alternatives to sort out this situation:
  • checking whether the incoming data respects the expected format inside a filter() function
  • embedding my stream inside a try/catch block. In this case, how can I restart my streams skipping the offset the caused the error?
Are these solutions correct? or is there a better way to handle this situation?

However, I'm looking for a solution to handle exceptions in general. I checked the confluent guide for kafka streams, but I didn't find anything about exception handling. How should I manage exceptions inside a kafka streams application? 

Thank you in advance for your help.

Kind Regards,

Francesco

Damian Guy

unread,
Nov 28, 2016, 5:14:00 AM11/28/16
to Confluent Platform
Hi Francesco,

KafkaStreams provides a method to enable you to shutdown or react to any UncaughtExceptions, i.e, KafkaStreams.setUncaughtExceptionHandler(..) - but this will only be could when a StreamThread is terminating due to an exception. Depending on how many threads you have running, the most obvious thing to do on this exception is to exit the application.

For other exceptions that you want to handle and/or retry, then it is left up to the developer to do.
When you say the format is wrong, do you mean the data wont deserialize? Or is it in your logic that the exceptions are happening? 
if it is in your program logic, then using filter to remove the unwanted messages is a reasonable thing to do.

If it is not in your logic and it is failing to deserialize the messages, then you'll need to set use Serde.ByteArray and manually deserialize the messages in your streams application. For example, you might want to use KStream.transform(..) to transform the bytes messages into the expected type - here you could add any logic to skip the messages that are not in the correct format.

Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6b8e2a53-e76f-4297-af17-c02a31f35644%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Francesco Nobilia

unread,
Nov 28, 2016, 5:20:42 AM11/28/16
to Confluent Platform
Thank you Damian for your prompt replay. 

Is it possible to restart a new stream instance inside KafkaStreams.setUncaughtExceptionHandler(..) skipping the offset that rises the error?

For example:
streamTypeA
 .filter(..)
 ....
 .setUncaughtExceptionHandler( if(error) then restart streamTypeA one offset ahead of the current one).

On Monday, 28 November 2016 10:14:00 UTC, Damian Guy wrote:
Hi Francesco,

KafkaStreams provides a method to enable you to shutdown or react to any UncaughtExceptions, i.e, KafkaStreams.setUncaughtExceptionHandler(..) - but this will only be could when a StreamThread is terminating due to an exception. Depending on how many threads you have running, the most obvious thing to do on this exception is to exit the application.

For other exceptions that you want to handle and/or retry, then it is left up to the developer to do.
When you say the format is wrong, do you mean the data wont deserialize? Or is it in your logic that the exceptions are happening? 
if it is in your program logic, then using filter to remove the unwanted messages is a reasonable thing to do.

If it is not in your logic and it is failing to deserialize the messages, then you'll need to set use Serde.ByteArray and manually deserialize the messages in your streams application. For example, you might want to use KStream.transform(..) to transform the bytes messages into the expected type - here you could add any logic to skip the messages that are not in the correct format.

Thanks,
Damian

On Mon, 28 Nov 2016 at 09:55 Francesco Nobilia <f.no...@gmail.com> wrote:
Hi there,

I'm implementing a kafka streams applications with multiple streams based on Java 8. It works fine but it does some assumptions on data format. If at least one of this assumption is not verified, my streams will fail raising exceptions. I have in mind two alternatives to sort out this situation:
  • checking whether the incoming data respects the expected format inside a filter() function
  • embedding my stream inside a try/catch block. In this case, how can I restart my streams skipping the offset the caused the error?
Are these solutions correct? or is there a better way to handle this situation?

However, I'm looking for a solution to handle exceptions in general. I checked the confluent guide for kafka streams, but I didn't find anything about exception handling. How should I manage exceptions inside a kafka streams application? 

Thank you in advance for your help.

Kind Regards,

Francesco

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Nov 28, 2016, 6:09:43 AM11/28/16
to Confluent Platform
There is no easy and convenient way of doing that.  The offsets for the topics and partitions are tracked internally, but not exposed outside of the streams library. There is also no way to tell streams to advance the offset or skip the message.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Jay Prakash Meena

unread,
Oct 24, 2017, 8:35:40 AM10/24/17
to Confluent Platform
Hi Damian/Francesco

Just the same doubt, Did we got a workaround for this ?

Is it possible to restart a new stream instance inside KafkaStreams.setUncaughtExceptionHandler(..) skipping the offset that rises the error?

Thanks

Matthias J. Sax

unread,
Oct 24, 2017, 5:35:22 PM10/24/17
to confluent...@googlegroups.com
It's no possible to restart the instance within the handler. You would
rather just set a flag and test this flag in the main thread regularly.
If the flag is set, stop the current running instance, create a new one
and restart the new one.

Skipping over a record is not easily possible. It also depends on the
error type. If you need to skip over messages, you can insert a
transform() as first operator and configure the Transformer with the
offset you want to skip. The transformer can access record offset via
the provided context from init() and thus skip over the bad record.


Hope this helps


-Matthias

On 10/24/17 5:35 AM, Jay Prakash Meena wrote:
> Hi Damian/Francesco
>
> Just the same doubt, Did we got a workaround for this ?
> /*
> */
> /*Is it possible to restart a new stream instance inside
> KafkaStreams.setUncaughtExceptionHandler(..) skipping the offset that
> rises the error?
> */
>
> Thanks
>
> On Monday, November 28, 2016 at 3:50:42 PM UTC+5:30, Francesco Nobilia
> wrote:
>
> Thank you Damian for your prompt replay. 
>
> Is it possible to restart a new stream instance inside
> KafkaStreams.setUncaughtExceptionHandler(..) skipping the offset
> that rises the error?
>
> For example:
> /streamTypeA/
>  .filter(..)
>  ....
>  .setUncaughtExceptionHandler( *if*(error) *then* restart
> /streamTypeA/ one offset ahead of the current one).
> * checking whether the incoming data respects the expected
> format inside a *filter() function*
> * embedding my stream inside a try/catch block. In this
> case, how can I restart my streams skipping the offset
> the caused the error?
>
> Are these solutions correct? or is there a better way to
> handle this situation?
>
> However, I'm looking for a solution to handle exceptions in
> general. I checked the confluent guide for kafka streams,
> but I didn't find anything about exception handling. How
> should I manage exceptions inside a kafka streams application? 
>
> Thank you in advance for your help.
>
> Kind Regards,
>
> Francesco
>
> --
> You received this message because you are subscribed to the
> Google Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails
> from it, send an email to
> confluent-platf...@googlegroups.com.
> <https://groups.google.com/d/msgid/confluent-platform/6b8e2a53-e76f-4297-af17-c02a31f35644%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/8f668d21-9f82-41d7-af14-adbf74da2a86%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/8f668d21-9f82-41d7-af14-adbf74da2a86%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages