stream shut down due to no locks for state store

1,277 views
Skip to first unread message

Shannon Ma

unread,
Nov 26, 2016, 11:49:08 AM11/26/16
to Confluent Platform
Hi,

After my stream runs for a while, i have this error and stream gets shut down, how can i check what is the issue?


04:25| ERROR | StreamThread.java 666 | stream-thread [StreamThread-1] Failed to create an active task %s:
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store txnwindowstore18000000-201611251500 at location /tmp/kafka-streams/rt_txn_dup_w/1_0/txnwindowstore18000000/txnwindowstore18000000-201611251500
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196)
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:333)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:51)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:212)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:235)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.rocksdb.RocksDBException: IO error: lock /tmp/kafka-streams/rt_txn_dup_w/1_0/txnwindowstore18000000/txnwindowstore18000000-201611251500/LOCK: No locks available
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:184)
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189)
        ... 26 more
04:25| ERROR | ConsumerCoordinator.java 232 | User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group rt_txn_dup_w failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store txnwindowstore18000000-201611251500 at location /tmp/kafka-streams/rt_txn_dup_w/1_0/txnwindowstore18000000/txnwindowstore18000000-201611251500
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196)
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:333)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:51)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:212)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:235)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.rocksdb.RocksDBException: IO error: lock /tmp/kafka-streams/rt_txn_dup_w/1_0/txnwindowstore18000000/txnwindowstore18000000-201611251500/LOCK: No locks available
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:184)
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189)
        ... 26 more

Damian Guy

unread,
Nov 26, 2016, 1:04:24 PM11/26/16
to Confluent Platform
Hi Shannon,

Do you have more logs available?
How many StreamThreads are you running with?
Are you running multiple instances? On the same host with the same state directory?

Thanks,
Damian


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/74ace331-bb72-4534-a19a-332c91d4c232%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Shannon Ma

unread,
Nov 26, 2016, 5:29:15 PM11/26/16
to Confluent Platform
Here are some before the exception. I am running one stream instance.

How many StreamThreads --> how do i check?


04:25| INFO | StreamTask.java 119 | task [1_0] Initializing state stores
04:25| DEBUG | ProcessorStateManager.java 127 | task [1_0] Registering state store txnwindowstore18000000 to its state manager
04:25| DEBUG | KafkaConsumer.java 930 | Subscribed to partition(s): rt_txn_dup_w-txnwindowstore18000000-changelog-0
04:25| DEBUG | KafkaConsumer.java 1214 | Seeking to end of partition rt_txn_dup_w-txnwindowstore18000000-changelog-0
04:25| DEBUG | Fetcher.java 341 | Resetting offset for partition rt_txn_dup_w-txnwindowstore18000000-changelog-0 to latest offset.
04:25| DEBUG | Fetcher.java 577 | Fetched {timestamp=-1, offset=391310} for partition rt_txn_dup_w-txnwindowstore18000000-changelog-0
04:25| DEBUG | KafkaConsumer.java 1196 | Seeking to beginning of partition rt_txn_dup_w-txnwindowstore18000000-changelog-0
04:25| DEBUG | Fetcher.java 341 | Resetting offset for partition rt_txn_dup_w-txnwindowstore18000000-changelog-0 to earliest offset.
04:25| DEBUG | Fetcher.java 577 | Fetched {timestamp=-1, offset=0} for partition rt_txn_dup_w-txnwindowstore18000000-changelog-0
04:25| DEBUG | AbstractCoordinator.java 694 | Received successful heartbeat response for group rt_txn_dup_w
04:25| DEBUG | ClientCnxn.java 742 | Got ping response for sessionid: 0x1588ecee04c001b after 8ms
04:25| DEBUG | AbstractCoordinator.java 694 | Received successful heartbeat response for group rt_txn_dup_w
04:25| DEBUG | KafkaConsumer.java 885 | Unsubscribed all topics or patterns and assigned partitions

Damian Guy

unread,
Nov 28, 2016, 4:57:50 AM11/28/16
to Confluent Platform
Hi Shannon,

If you haven't set StreamsConfig.NUM_STREAM_THREADS_CONFIG, then there will just be 1.

Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Shannon Ma

unread,
Nov 28, 2016, 10:13:10 AM11/28/16
to Confluent Platform
Is there a close/release method i should call?

Damian Guy

unread,
Nov 28, 2016, 10:28:29 AM11/28/16
to Confluent Platform
Hi Shannon,

In this case, no.
I'm still trying to understand how this could have happened.  The exception usually indicates that we are trying to open the same RocksDB instance twice - which shouldn't happen given that you are running single threaded. 

Which version of KafkaStreams are you using?

Thanks,
Damian

On Mon, 28 Nov 2016 at 15:13 Shannon Ma <shan...@gmail.com> wrote:
Is there a close/release method i should call?

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Davor Poldrugo

unread,
Nov 28, 2016, 3:39:36 PM11/28/16
to Confluent Platform
I had the same problem for months. Few days I have started investigating and the whole theory is here - KAFKA-4455, together with a possible solution and code for this, what I think it is - a bug.
BUGFIX commit: BUGFIX: When commit fails during rebalance - release resources.

I have been running this fork in production for 3 days and the error doesn't come-up.

Davor
https://dpoldrugo.github.io/


On Monday, November 28, 2016 at 4:28:29 PM UTC+1, Damian Guy wrote:
Hi Shannon,

In this case, no.
I'm still trying to understand how this could have happened.  The exception usually indicates that we are trying to open the same RocksDB instance twice - which shouldn't happen given that you are running single threaded. 

Which version of KafkaStreams are you using?

Thanks,
Damian

On Mon, 28 Nov 2016 at 15:13 Shannon Ma <shan...@gmail.com> wrote:
Is there a close/release method i should call?

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Eno Thereska

unread,
Nov 29, 2016, 4:31:45 AM11/29/16
to Confluent Platform
Hi Davor,

Would it be possible to open a PR with your code (that way you are acknowledged appropriately if the code makes it in etc)? If you're busy we can open a PR, let us know.

Thanks
Eno

Davor Poldrugo

unread,
Nov 29, 2016, 4:41:47 AM11/29/16
to Confluent Platform
Hi Eno!
Sure. No problem. Do I make a PR against the trunk or for the 0.10.1 branch?

Davor
https://dpoldrugo.github.io

Damian Guy

unread,
Nov 29, 2016, 5:11:33 AM11/29/16
to Confluent Platform
Hi Davor,

You can raise the PR against trunk.

Thanks,
Damian

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Davor Poldrugo

unread,
Nov 29, 2016, 8:18:33 AM11/29/16
to Confluent Platform
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Nov 29, 2016, 9:42:28 AM11/29/16
to Confluent Platform
Thanks!

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Shannon Ma

unread,
Dec 2, 2016, 12:04:57 PM12/2/16
to Confluent Platform
Hi Davor,

Would you mind attach the particular jar with your fix that i can replace and try?

Thanks
Shannon

Shannon Ma

unread,
Dec 2, 2016, 12:37:54 PM12/2/16
to Confluent Platform
It is the kafka-streams-0.10.2.0-SNAPSHOT.jar, right?

Thanks
Shannon

Matthias J. Sax

unread,
Dec 2, 2016, 1:15:21 PM12/2/16
to confluent...@googlegroups.com
This PR is not merged yet thus not contained in 0-10.2.0-SNAPSHOT yet.

If you want to try it right now, you need to checkout the PR branch and
build it locally by yourself.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/11be7676-d635-4451-ba98-a821c6879ac5%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/11be7676-d635-4451-ba98-a821c6879ac5%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Shannon Ma

unread,
Dec 2, 2016, 2:32:12 PM12/2/16
to Confluent Platform
Yes, i built locally, thanks.

Shannon Ma

unread,
Dec 6, 2016, 9:39:40 AM12/6/16
to Confluent Platform
With the new jar, i still have the issue, however i have an exception before this happened



23:08| ERROR | StreamThread.java 330 | stream-thread [StreamThread-1] Failed while executing StreamTask 1_0 duet to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
        at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:297)
        at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:359)
        at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:355)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:297)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$900(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:143)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:336)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:303)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)




On Friday, December 2, 2016 at 1:32:12 PM UTC-6, Shannon Ma wrote:
Yes, i built locally, thanks.

Eno Thereska

unread,
Dec 6, 2016, 11:40:46 AM12/6/16
to Confluent Platform
Shannon, which version of Kafka Streams are you using? I ask because the lines in your stack dump do not match with the latest in trunk. E.g., in StreamThread.commitOffsets or StreamTask.commitOffsets.

Eno

Shannon Ma

unread,
Dec 6, 2016, 12:59:02 PM12/6/16
to Confluent Platform
I am using confluent-3.1.1, and replaced only kafka-streams-xx.jar with the one from Davor above.

Davor

unread,
Dec 7, 2016, 6:59:33 AM12/7/16
to confluent...@googlegroups.com
Shannon, do you have some custom processors which are using state stores?
If yes, do you close the state stores in your implementation of org.apache.kafka.streams.processor.Processor#close ?

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/i5cwYhpUtx4/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8aeaa194-1a17-4f47-a594-910be1f8509a%40googlegroups.com.

Shannon Ma

unread,
Dec 7, 2016, 10:18:27 AM12/7/16
to Confluent Platform
I have the first Kafka Streams and store the data, then in my second Kafka Streams i use the store, in my flatMapValues() method, i dont see a close method.

Shannon Ma

unread,
Dec 7, 2016, 10:19:55 AM12/7/16
to Confluent Platform
close the state stores --> how to close?

Shannon Ma

unread,
Dec 8, 2016, 2:07:08 PM12/8/16
to Confluent Platform
Another thing, for my streaming data, the data are coming during the day, from the log the lock issue happens after hour (night or early morning) where there is no message goes to the topic, not sure it is relevant here.

Shannon Ma

unread,
Dec 10, 2016, 10:17:59 AM12/10/16
to Confluent Platform
I see this error before the lock error


22:04| WARN | StreamThread.java 579 | stream-thread [StreamThread-2] Failed to commit StreamTask 0_0 state:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
        at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:297)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:281)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
22:04| WARN | StreamThread.java 579 | stream-thread [StreamThread-1] Failed to commit StreamTask 1_0 state:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
        at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:297)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:281)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Shannon Ma

unread,
Dec 14, 2016, 9:36:34 PM12/14/16
to Confluent Platform
Anyone has any idea or suggestion?

Thanks
Shannon

Matthias J. Sax

unread,
Dec 15, 2016, 4:06:29 AM12/15/16
to confluent...@googlegroups.com
Please read this SO question; it seems your application times out:

http://stackoverflow.com/questions/39232395/kafka-kstreams-processing-timeouts/39237089#39237089

I guess it's not related to locks for state store.

Btw: this email thread is quite long and contains many different
questions. It is better to have a single thread per topic, so other user
get not lost when they search for solution of similar problems.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/40447e6d-5e88-4ffd-94f1-fcba1499318d%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/40447e6d-5e88-4ffd-94f1-fcba1499318d%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Shannon Ma

unread,
Dec 15, 2016, 9:59:06 AM12/15/16
to Confluent Platform
Thanks i will take a look. What killed the streams is the lock at the end, what i am not sure is if the timeout issue causes the lock that eventually kills the stream. The main topic is still the stream got killed with the lock issue, even the timeout issue, should stream/lock handle it so it can continue?

Shannon

Matthias J. Sax

unread,
Dec 15, 2016, 1:00:12 PM12/15/16
to confluent...@googlegroups.com
Locks should eventually get release... Be fixed a couple of bugs
recently -- and there are still some bug fix PRs that are not merged yet.

If you run on trunk and still see lock problems, we might need to
investigate -- there might be one more bug. But please make sure you run
with latest trunk and that you ruled out other potential causes for you
problem.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/c12e0a34-604a-437d-98b8-8ab2dd5eac05%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/c12e0a34-604a-437d-98b8-8ab2dd5eac05%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Shannon Ma

unread,
Dec 16, 2016, 12:26:15 PM12/16/16
to Confluent Platform
Okay, thanks, i will try the latest.

Shannon


On Thursday, December 15, 2016 at 12:00:12 PM UTC-6, Matthias J. Sax wrote:
Locks should eventually get release... Be fixed a couple of bugs
recently -- and there are still some bug fix PRs that are not merged yet.

If you run on trunk and still see lock problems, we might need to
investigate -- there might be one more bug. But please make sure you run
with latest trunk and that you ruled out other potential causes for you
problem.


-Matthias

On 12/15/16 6:59 AM, Shannon Ma wrote:
> Thanks i will take a look. What killed the streams is the lock at the
> end, what i am not sure is if the timeout issue causes the lock that
> eventually kills the stream. The main topic is still the stream got
> killed with the lock issue, even the timeout issue, should stream/lock
> handle it so it can continue?
>
> Shannon
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Rodrigo Monteiro

unread,
Feb 1, 2017, 1:18:24 PM2/1/17
to Confluent Platform
Hi!

sorry to reactivate this thread..

I am having the same problem (using CP 3.1.1 - KafkaStream 0.10.1.0-cp2)

I saw that the fix was made (https://issues.apache.org/jira/browse/KAFKA-4455). it will be available in version 0.10.2.0? there are a date for release?

Thanks!

Damian Guy

unread,
Feb 1, 2017, 1:22:06 PM2/1/17
to Confluent Platform
Hi,

Yes that will be available in 0.10.2.0. The release date has not been confirmed, but the first release candidate should be out this week.

Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c74a1cdc-0065-4181-b6c5-a526c2b45f36%40googlegroups.com.

Rodrigo Monteiro

unread,
Feb 1, 2017, 1:28:21 PM2/1/17
to Confluent Platform
Thanks Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Message has been deleted

pederpansen

unread,
Feb 22, 2017, 9:41:41 AM2/22/17
to Confluent Platform
Davor, in the referenced ticket you talk about a workaround by using an UncaughtExceptionHandler to manually restart the Kafka Streams Topology. Do you have example code on how to achieve this? I am having trouble figuring out how to do it, since I can't do the restart from within the thread that has the exception handler registered, am I right?
Reply all
Reply to author
Forward
0 new messages