I have an application with four streams running, today i saw error in one of the streams
2017/09/01 11:55:17.245[ERROR][](StreamThread.java): stream-thread [StreamThread-2] Failed while executing StreamTask 0_0 duet to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498) ~[kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104) ~[kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:289) ~[kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:359) ~[kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:355) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:297) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.access$900(StreamThread.java:69) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:143) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:336) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:303) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) [kafka-streams-0.10.1.1.jar!/:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) [kafka-streams-0.10.1.1.jar!/:?]
after that i saw the close() was called, and the stream just stopped.
So should i change the configs as the log suggested? once i restarted the app it is good.
How can i improve the app so that it can handle this or can somehow restart the stream?