Kafka Streams VS broker starts/stops: "This server is not the leader for that topic-partition"

8,011 views
Skip to first unread message

Nicolas Fouché

unread,
Mar 6, 2017, 12:33:04 PM3/6/17
to confluent...@googlegroups.com
Hi,

Are Kafka Streams processes supposed to stop when a broker is shutdown (properly or not) or appears ? Is there a plan to make it recover automatically ?
That's the case for me, a new or removed broker produces the same exception, which triggers a global stop:
"org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition."


Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_23, processor=KSTREAM-SOURCE-0000000000, topic=abc, partition=23, offset=388592
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_23] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:34)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:97)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:46)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:60)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
... 2 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Thanks.

- Nicolas

nfo...@onfocus.io

unread,
Mar 6, 2017, 12:45:49 PM3/6/17
to Confluent Platform
I might add that my Kafka setup is: 3 brokers with a replication of 3.

Eno Thereska

unread,
Mar 7, 2017, 4:51:37 AM3/7/17
to Confluent Platform
Hi Nicolas,

This looks like a bug. Mind filing a JIRA at https://issues.apache.org/jira/browse/KAFKA with the above info?

Thanks
Eno

nfo...@onfocus.io

unread,
Mar 9, 2017, 10:32:55 AM3/9/17
to Confluent Platform
Hi Eno,

I can't reproduce it. We restarted all our brokers one by one this morning, and all our Streams processes went fine.

- Nicolas

Bogdan Istrate

unread,
Apr 12, 2017, 4:10:15 PM4/12/17
to Confluent Platform
Hi Eno,

I seem to be experiencing the exact same issue, right down to the exact same stack trace. 
I think I've found a way to reproduce the issue, which I submitted as an SO question with some more details:
Please let me know if I should submit a JIRA ticket for this.

Thanks,
Bogdan

Eno Thereska

unread,
Apr 12, 2017, 4:42:24 PM4/12/17
to Confluent Platform
Hi there,

This is now fixed in 0.10.2.1. If you can't pick that up (it's currently being voted), make sure you have these two parameters set as follows in your streams config:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from default of 0
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from default of 300 s

Thanks
Eno

Bogdan Istrate

unread,
Apr 13, 2017, 9:45:11 AM4/13/17
to Confluent Platform
Hi Eno,

Thanks for your response!
I'm not able to use 0.10.2.1 at the moment, so I changed the configs you suggested and I'm no longer getting the NotLeaderForPartitionException. 
However, I'm now getting the following error when I follow the procedure to replicate that I described in the SO question:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] failed to suspend stream tasks
        at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
        ... 1 more
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
        at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
        at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
        at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
        at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
        ... 10 more

Is this also fixed in 0.10.2.1?

Thanks,
Bogdan

Eno Thereska

unread,
Apr 13, 2017, 12:10:04 PM4/13/17
to Confluent Platform
Hi Bogdan,

Could you try to increase consumer configuration value "session.timeout.ms" via
StreamsConfig. Default value is 10000. Can you let us know if that solves your problem? We haven't changed that default in 0.10.2.1.

Thanks

Bogdan Istrate

unread,
Apr 13, 2017, 2:33:06 PM4/13/17
to Confluent Platform
Hi Eno, 

I increased the value to 20000 and it seems to have solved the problem!

Thanks very much!
Bogdan
Reply all
Reply to author
Forward
0 new messages