Kafka Stream error

213 views
Skip to first unread message

s...@etcc.com

unread,
Sep 1, 2017, 6:01:58 PM9/1/17
to Confluent Platform
HI,

I have an application with four streams running, today i saw error in one of the streams



2017/09/01 11:55:17.245[ERROR][](StreamThread.java): stream-thread [StreamThread-2] Failed while executing StreamTask 0_0 duet to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498) ~[kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104) ~[kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:289) ~[kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:359) ~[kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:355) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:297) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.access$900(StreamThread.java:69) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:143) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:336) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:303) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) [kafka-streams-0.10.1.1.jar!/:?]
 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.1.jar!/:?]


after that i saw the close() was called, and the stream just stopped.


So should i change the configs as the log suggested? once i restarted the app it is good.


How can i improve the app so that it can handle this or can somehow restart the stream?


Thanks
Shannon


Matthias J. Sax

unread,
Sep 1, 2017, 6:49:42 PM9/1/17
to confluent...@googlegroups.com
Yes, increasing max.poll.interval.ms should help. We changed the default
of this parameter to `Integer.MAX_VALUE` already in later releases.

For handling the error better, you should register an uncaught exception
handler. This allows you to close the current KafkaStreams instance,
create a new one and (re)start.

Cf.
http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/6495905d-523c-4bdc-8edd-74d168e4e245%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/6495905d-523c-4bdc-8edd-74d168e4e245%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages