org.apache.kafka.streams.errors.StreamsException: Failed to rebalance

1,369 views
Skip to first unread message

Thijs Cadier

unread,
Aug 4, 2016, 11:24:27 AM8/4/16
to Confluent Platform
Hi,

We've been testing streams the last few weeks and are running into what looks like a bug. Once in while we're seeing a StreamsException with the message "Failed to rebalance". At first we this was caused by the locking issue on the temp directory that will be fixed by KIP-62.

After changing poll.ms to 500 the same StreamsException sometimes occurs, but the cause is now: java.lang.IllegalStateException: Log end offset should not change while restoring.

Is there something we might have misconfigured? Is this a known issue?

Thijs

Guozhang Wang

unread,
Aug 4, 2016, 7:20:12 PM8/4/16
to Confluent Platform
Hi Thijs,

We have seen this issue "Log end offset should not change while restoring" with an LRU memory store, which we have fixed and will be included in the upcoming 0.10.0.1 / CP 3.0.1 release (very very soon).


Could you take a look at see if yours is the same or if it is a different one?

Guozhang

Thijs Cadier

unread,
Aug 5, 2016, 6:59:09 AM8/5/16
to Confluent Platform
Hi Guozhang,

The symptoms definitively look similar. I'm pretty sure we don't use an in memory table though. A Windowed table is always RocksDB right?

Thijs

Thijs Cadier

unread,
Sep 13, 2016, 10:04:31 AM9/13/16
to Confluent Platform
We're still seeing this issue with 3.0.1. Any recommendations?

Thijs Cadier

unread,
Sep 13, 2016, 10:16:57 AM9/13/16
to Confluent Platform
We're seeing the following stacktrace. The processor endlessly try to rebalance when this occurs.

Exception in thread "StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.lang.IllegalStateException: Log end offset should not change while restoring
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:257)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:212)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184)
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:255)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:250)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:459)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:445)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:702)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:266)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
        ... 1 more
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
        at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:255)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$2.onSuccess(AbstractCoordinator.java:250)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:459)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:445)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:702)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:266)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:975)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
        ... 1 more
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/appsignal-processor/0_2
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
        at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
        ... 32 more

Guozhang Wang

unread,
Sep 13, 2016, 4:03:17 PM9/13/16
to Confluent Platform
Thijs,

We are seeing similar issues with latest trunk with our nightly test builds as well and are actively investigating, the plan is to fix it before the 0.10.1.0 (CP 3.1.0) code freeze and we will keep you posted.

Guozhang

Thijs Cadier

unread,
Sep 13, 2016, 4:09:10 PM9/13/16
to Confluent Platform
That's good to hear!

We have a system we'd like to move to production in the near future, but we're stalled on this issue. Do you have any ideas on the timeline for this?

Guozhang Wang

unread,
Sep 14, 2016, 7:50:18 PM9/14/16
to Confluent Platform
There is already a PR for the fix and we are merging it into trunk by this week:


Which means if you are currently testing over trunk, you will get it be then. If you are running over released versions, it'll be available in the next release, 0.10.1.0 / CP 3.1.0, in Oct.


Guozhang
Reply all
Reply to author
Forward
0 new messages