Kafka streams threads dying and not being resurrected

3,069 views
Skip to first unread message

Emiliano Capoccia

unread,
Aug 4, 2016, 4:51:20 PM8/4/16
to Confluent Platform
Hello,

With my team we've been investigating Kafka Streams to use for a new application development. 

Our topology looks like source (input topic) -> processor (business logic) -> sink (output topic).

Now, we are facing an issue when an exception is thrown in the stream processing thread and is not caught.
In this scenario, the thread dies, and the application needs to be restarted.
The exception is thrown outside of the application code itself -- for instance failure to deserialize an input message due to wrong format, such as when old producers on kafka 0.9 are publishing to a 0.10 cluster.

Now, having to restart the application when somebody mess up the input topics can turn into a problem for the OPs.

The question is, am I missing something in the API? Can I somehow hook on the uncaught exception handler?
I can see that the implementation uses a StreamThread array, I wonder if there is any way of having the infrastructure handle such failures similarly to a fixed size thread pool. 

Emiliano

Matthias J. Sax

unread,
Aug 4, 2016, 5:32:37 PM8/4/16
to confluent...@googlegroups.com
Yes you can. :)

> KafkaStream#setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh)


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/33bb6c86-b9c5-4440-8584-d5d3616efb32%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/33bb6c86-b9c5-4440-8584-d5d3616efb32%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Emiliano Capoccia

unread,
Aug 5, 2016, 3:32:00 AM8/5/16
to Confluent Platform
Hello Matthias, thanks for your reply.

I see that I can set an uncaught exception handler, however I don't understand how to keep the stream thread pool size constant by using that.
I mean, other than logging somehow, the thread which calls the handler is dying anyway, and I can't access it from the application code.
Am I missing something?

Emiliano

user c

unread,
Aug 5, 2016, 4:06:47 AM8/5/16
to Confluent Platform
I haven't used the API extensively in order to suggest that in the API, itself. But is it possible for your application to have a thread pool at language level that can control the kafka stream thread. This way, even if the stream reader dies it won't kill your application and it would just be a matter of creating another thread again.

Emiliano Capoccia

unread,
Aug 5, 2016, 8:21:48 AM8/5/16
to Confluent Platform
Hello, thanks for your answer.

So basically you're suggesting to build sort of a watchdog around the internal threadpool of the KafkaStreams class to keep it at the prescribed size. Essentially, doing what a fixed size threadpool does, only rolled out in-house. Or else detect that the KStream is in a doomed state from the outside and restart it somehow. Am I worng?

I can see two issues with that, linked to one another.

The first one, I cannot access the threadpool inside the KafkaStreams class. The pool in the KafkaStreams class is created during construction as a private StreamThread[] array (a subclass of Thread), and started in the start() method of the class.
Neither I can query the class itself to check the status of the internal pool. So, given that you're essentially new-ing the KafkaStreams somewhere (in my case, in the spring context), then detecting a failure and taking action is very hard.

The second one, even if I could access the threadpool internal to the KafkaStreams class, is that in case I resurrect the thread "as is", without taking extra care and advance the offset in the input topic, the crash is going to happen again.

Of course there are a number of ways around it, e.g. by exploiting the uncaught exception handler to record that some thread died, and trying a shutdown followed by restarting a new instance in some watchdog thread.

However it did not feel completely right to me to code such thing at application level; if you think about it this is a support that should be offered at framework level. At the end of the day, keeping a threadpool at size and skipping poison messages aren't business concerns.
To me, such a solution feels more like a hack that will need to be removed when this support will be offered by the framework.

What I was looking after was some form of callback in the framework allowing me to take action in case somebody (by mistake or maliciously) puts a poison pill in the input topic, but I couldn't find any.

I'm keen to hear more on the subject, 

Emiliano

Damian Guy

unread,
Aug 5, 2016, 9:28:51 AM8/5/16
to Confluent Platform
Hi,

You could shutdown the app in the exception handler and have it automatically restarted, however, these sort of failures are likely to result in it constantly restarting as the offset wont have moved forward and the bad/corrupt/invalid messages will be processed over and over, unless you move the offset forward prior to restarting.  

Have you checked to see why the messages are invalid?

Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/56d2ea13-88a8-42b5-a192-5c111c0f708b%40googlegroups.com.

Michael Noll

unread,
Aug 5, 2016, 11:49:00 AM8/5/16
to confluent...@googlegroups.com
Emiliano,

you may also want to check whether the filter() or branch() (in the DSL) could help you address your problem.

-Michael



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAJikTEWw%2BF8kz_PcioVRzB5H3wWvvtHjkyrFszfG0h9XcHh5CA%40mail.gmail.com.

Emiliano Capoccia

unread,
Aug 6, 2016, 4:03:29 AM8/6/16
to Confluent Platform
Hi Michael,
thanks for the interesting link.

Actually, managing the errors at deserializer level makes sense. 
The filter / branch are not applicable here as the topic contains homogenous data, just some of them are corrupted.

On further investigation, we found that the case which is hurting us is a bit more subtle: a message produced by a 0.9 client will appear as valid to a 0.10 consumer and processor, but then it'll fail to send in the producer.
I guess this is a separate issue though, I think that for this discussion the recommendation to handle errors at deserializer level is enough.

Emiliano

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Emiliano Capoccia

unread,
Aug 6, 2016, 4:10:00 AM8/6/16
to Confluent Platform
Hi Damian, thanks for your answer.

The messages we're receiving are invalid as they are produced with an incompatible version of the kafka producer (0.9), and because this is happening entirely at framework level there is little you can do to avoid the crash.
However, we've been investigating the subject a bit further and think that the correct approach in the general case is to not let the exception bubble up from the deserializer, please see the answers to Michael.

Emiliano

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Michael Noll

unread,
Aug 8, 2016, 9:01:04 AM8/8/16
to confluent...@googlegroups.com
Emiliano,

another thing that came to my mind:

Perhaps you could consider implementing a custom timestamp extractor.  Here, the extractor would first attempt to extract an embedded Kafka timestamp (which requires the message to be sent from a Kafka 0.10+ producer), this would yield event-time semantics for a given record.  If extracting this timestamp fails, then the extractor could either (a) inspect the payload of the message to extract a timestamp (assuming your upstream 0.9 producer does somehow include such timestamps) or (b) fallback to returning the current processing-time.

In other words, a combination of the default ConsumerRecordTimestampExtractor [1] and the WallclockTimestampExtractor [2].  Of course the drawback is that you'd be mixing event-time (for 0.10 produced messages) and processing-time semantics (0.9 produced messages) here if your 0.9 producer don't include any timestamp information in the message payload.  You can of course also update your 0.9 producer applications to include such timestamps manually if such timestamps are not already in the payload.

This approach could allow you to extract proper timestamps for Kafka topics into which both 0.9 and 0.10 producers would write data.

Just thinking out loud,
Michael





--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Emiliano Capoccia

unread,
Aug 8, 2016, 2:06:25 PM8/8/16
to Confluent Platform
Hi Michael,

thanks for your interesting reply and considerations.
And, actually, they make a lot of sense to ensure interoperability of 0.9 and 0.10 clients with the same 0.10 cluster. And surely I'll go for that approach if I had to offer such an interoperability.

On a further read on KIP-32 (adding timestamp to kafka messages) however, I think that the underlying reasons to add the timestamp to the messages are very valid, and something we cannot dispense to do.
Therefore, for our application, we've taken the more drastic approach of mandating 0.10 clients for all clients.

This combined with the considerations handling the exception on the deserializer should offer the required resiliency to take the application to prod.

Emiliano

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Aug 8, 2016, 3:14:20 PM8/8/16
to confluent...@googlegroups.com
Therefore, for our application, we've taken the more drastic approach of mandating 0.10 clients for all clients.

Great to hear that this is actually possible in your situation.  Other users are often forced to stick with existing producer applications for quite some time to ensure legacy support. ;-)



Hi Michael,

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages