Kafka Streams app instances and state stores

2,239 views
Skip to first unread message

Tianxiang Xiong

unread,
Feb 16, 2017, 2:59:51 AM2/16/17
to Confluent Platform
When running multiple instances of a Kafka Streams application (let's call it MyApp) on the same broker, should each instance of MyApp have a unique state dir?

If two instances both use the default (say /var/lib/kafka-streams/MyApp), wouldn't there be locking issues between them?

I've observed some locking issues which I suspect are similar to those discussed in this thread. In particular, I'm trying to:
  1. Bring up an instance of MyApp with embedded Kafka (for local testing)
  2. Shut down the KafkaStreams object with the close method
  3. Delete the state dir
  4. Start up another instance of MyApp to run different tests
Note that this doesn't run instances in paralell, it actually runs them sequentially.

When trying to do so, I get (on a Macbook, so dirs are different):

org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
 at org
.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
 at org
.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store account-balance-updated-1 at location /private/tmp/kafka-streams/MyApp/1_0/rocksdb/account-balance-updated-1
 at org
.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:222)
 at org
.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:166)
 at org
.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:171)
 at org
.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
 at org
.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
 at org
.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
 at org
.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
 at org
.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
 at org
.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
 at org
.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
 at org
.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:234)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 at org
.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
 at org
.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 at org
.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 at org
.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
 at org
.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
 at org
.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
 at org
.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
 at org
.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
 
... 1 more
Caused by: org.rocksdb.RocksDBException: IO error: lock /private/tmp/kafka-streams/MyApp/1_0/rocksdb/account-balance-updated-1/LOCK: No locks available
 at org
.rocksdb.RocksDB.open(Native Method)
 at org
.rocksdb.RocksDB.open(RocksDB.java:183)
 at org
.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:215)
 
... 36 more


The state store for account-balance-updated-1 is from a reduceByKey in the topology.

Interestingly, I don't get this issue when starting up MyApp, running the first test, shutting down the process, then starting it up again to run the second test. This issue only seems to occur when I run the two tests sequentially in the same process.

It seems that a lock on the RocksDB store is held even though close was called on the first MyApp instance and the state store itself is deleted after the close. Does this sound possible?

Eno Thereska

unread,
Feb 16, 2017, 6:04:11 AM2/16/17
to Confluent Platform
You will need to use different state directories if running multiple instances on the same machine.

Thanks
Eno

Tianxiang Xiong

unread,
Feb 16, 2017, 12:21:32 PM2/16/17
to Confluent Platform
OK, that makes sense.

What about the case of closing a KafkaStreams object, cleaning out the state store, then starting another instance? I would expect all locks to be released, but it seems that's not the case based on the error above.

Matthias J. Sax

unread,
Feb 16, 2017, 1:20:03 PM2/16/17
to confluent...@googlegroups.com
Hi,

actually it's not required to have different state.dir if you run two
instances on the same machine. From a state.dir point of view, running
two instances is the same thing as running 2 StreamThread swithin a
single instance.

However, using different state.dir can resolve the issues you are seeing
(I guess you are running 0.10.1 ?). For the new release (should be out
the next weeks), we fixed a couple of this state.dir locking bugs and
thus, for 0.10.2 / CP 3.2 this workaround of different state.dirs should
not be required anymore.

Furthermore, I want to add that it is not required and not recommended
to run you Streams instances on the same machines your brokers are running!

See this SO question:
http://stackoverflow.com/questions/41844253/scaling-kafka-stream-application-across-multiple-users/41845327#41845327


-Matthias
signature.asc

Sabarish Sasidharan

unread,
Feb 16, 2017, 1:30:09 PM2/16/17
to confluent...@googlegroups.com
Not sure about 0.10.2 but in 0.10.0.1 task directories within the state directory get deleted if they are assigned to the state thread. I think the ProcessorStateManager does this.

So we need 2 different state dirs.

Regards
Sab

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7a3c7e52-caec-752c-fb64-3478ccdcfbbd%40confluent.io.
For more options, visit https://groups.google.com/d/optout.

Matthias J. Sax

unread,
Feb 16, 2017, 1:36:58 PM2/16/17
to confluent...@googlegroups.com
Can you explain how this relates?

Even if a task dir with state.dir gets deleted this should not be an
issue, because two different threads get different tasks assigned (or
there is another bug...)


-Matthias

On 2/16/17 10:30 AM, Sabarish Sasidharan wrote:
> Not sure about 0.10.2 but in 0.10.0.1 task directories within the state
> directory get deleted if they are assigned to the state thread. I think
> the ProcessorStateManager does this.
>
> So we need 2 different state dirs.
>
> Regards
> Sab
>
> On 16 Feb 2017 11:50 p.m., "Matthias J. Sax" <matt...@confluent.io
> send an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platform%2Bunsu...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/7a3c7e52-caec-752c-fb64-3478ccdcfbbd%40confluent.io>.
> For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAOTij-6U0YhWV8pi_AfGDRiB36DOnMxv821X4BEk6v0z_cy71w%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAOTij-6U0YhWV8pi_AfGDRiB36DOnMxv821X4BEk6v0z_cy71w%40mail.gmail.com?utm_medium=email&utm_source=footer>.
signature.asc

Tianxiang Xiong

unread,
Feb 16, 2017, 4:46:13 PM2/16/17
to Confluent Platform
The app is currently running 0.10.0.0, but I will upgrade to 0.10.1.1 soon and see if that resolves some of the issues we're seeing.

The problem I described, is actually not about running two instances in parallel, but starting an instance, closing it, deleting the state dir, then starting another instance. In this scenario, the second started instance has a locking problem on the state dir (which is the same for the two instances). I would not expect this to happen.

Eno Thereska

unread,
Feb 16, 2017, 5:10:49 PM2/16/17
to Confluent Platform
This should be fixed in 0.10.2 (which is coming up hopefully this week or early next).

Eno

Tianxiang Xiong

unread,
Feb 16, 2017, 5:12:00 PM2/16/17
to Confluent Platform
OK, I'll try that after we upgrade to 0.10.2.

Matthias J. Sax

unread,
Feb 16, 2017, 5:12:53 PM2/16/17
to confluent...@googlegroups.com
How many threads do you use? Kafka 0.10.0 and 0.10.1 do have some issues
with multi-threading... The recommended workaround is to run single
threaded and use multiple instances (if on same machine than with
different state.dirs)

We fixed those problems for 0.10.2 that is going to be release next week
(or the week after) -- thus you might want to upgrade to 0.10.2 directly.


-Matthias


On 2/16/17 1:46 PM, 'Tianxiang Xiong' via Confluent Platform wrote:
> The app is currently running 0.10.0.0, but I will upgrade to 0.10.1.1
> soon and see if that resolves some of the issues we're seeing.
>
> The problem I described, is actually *not* about running two instances
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/399118d1-e2fa-45a6-88c5-5f4f04b4739b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/399118d1-e2fa-45a6-88c5-5f4f04b4739b%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Tianxiang Xiong

unread,
Feb 16, 2017, 5:18:59 PM2/16/17
to Confluent Platform
I believe we're using the default, which is 1 thread.

ChanK

unread,
Oct 19, 2017, 4:36:15 PM10/19/17
to Confluent Platform
Hello, 

I am using kafka .11. I am running both kafka and kafka streams on the same machine. and my kafka streams is a single threaded instance. I am having a similar state.dir lock issue when I try to do a cleanup() or stop the kafkaStreams instance. 
2017-10-19 15:32:01,289 myApp, - INFO 7152 --- [nio-8082-exec-3] o.a.k.s.p.internals.StateDirectory       : stream-thread [cleanup] Deleting obsolete state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
2017-10-19 15:32:01,315 myApp, -ERROR 7152 --- [nio-8082-exec-3] o.a.k.s.p.internals.StateDirectory       : stream-thread [cleanup] Failed to lock the state directory due to an unexpected exception

java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\png-receiver\0_0
at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source) ~[na:1.8.0_144]
at sun.nio.fs.AbstractFileSystemProvider.delete(Unknown Source) ~[na:1.8.0_144]
at java.nio.file.Files.delete(Unknown Source) ~[na:1.8.0_144]
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:597) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:580) ~[kafka-clients-0.11.0.0.jar:na]
at java.nio.file.Files.walkFileTree(Unknown Source) ~[na:1.8.0_144]
at java.nio.file.Files.walkFileTree(Unknown Source) ~[na:1.8.0_144]
at org.apache.kafka.common.utils.Utils.delete(Utils.java:580) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:221) ~[kafka-streams-0.11.0.0.jar:na]
at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:590) [kafka-streams-0.11.0.0.jar:na]
at org.springframework.kafka.core.KStreamBuilderFactoryBean.stop(KStreamBuilderFactoryBean.java:136) [spring-kafka-1.2.2.RELEASE.jar:na]
at com.panera.notification.receiver.kafka.consumer.geofence.GeofenceReceiver.restartStream(GeofenceReceiver.java:90) [classes/:na]
at com.panera.notification.receiver.kafka.consumer.geofence.GeofenceReceiver$$FastClassBySpringCGLIB$$7228cf93.invoke(<generated>) [classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-4.3.8.RELEASE.jar:4.3.8.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:738) [spring-aop-4.3.8.RELEASE.jar:4.3.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) [spring-aop-4.3.8.RELEASE.jar:4.3.8.RELEASE]
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:69) [spring-security-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.3.8.RELEASE.jar:4.3.8.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673) [spring-aop-4.3.8.RELEASE.jar:4.3.8.RELEASE]
at com.panera.notification.receiver.kafka.consumer.geofence.GeofenceReceiver$$EnhancerBySpringCGLIB$$d4c0fb5.restartStream(<generated>) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_144]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_144]

Can you identify what is causing this exception.

Damian Guy

unread,
Oct 20, 2017, 4:37:54 AM10/20/17
to Confluent Platform
Are you sure the streams instance has stopped before you call `cleanUp()` ?

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c5a8e1e2-2102-41c4-a521-62c251c8a2b5%40googlegroups.com.

ChanK

unread,
Oct 20, 2017, 9:37:54 AM10/20/17
to Confluent Platform
Yes, I logged the state before cleanup() to makesure. 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Matthias J. Sax

unread,
Oct 24, 2017, 12:52:59 AM10/24/17
to confluent...@googlegroups.com
If you have only a single thread, this is pretty weird and it's hard to
say what could go wrong.

Are you sure, that the StreamThread is not running anymore? Are you also
sure, that there is no second instance of the same application that is
still running?


On cleanup, we traverse the whole state directory and delete everything
file-by-file. Each time, we step out of an empty directory, we delete
the empty directory. The exception indicates, that the directory is not
empty when we try to delete it -- however, this is only possible if some
other thread writes into the directory concurrently...



-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/c5a8e1e2-2102-41c4-a521-62c251c8a2b5%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

christia...@bytro.com

unread,
Jan 2, 2018, 12:42:19 PM1/2/18
to Confluent Platform
Hello,

I'm posting in this discussion, because I have a similar issue as ChanK. When I call org.apache.kafka.streams.KafkaStreams#cleanUp on a non-running stream, I get the following error:

02 17:58:02.369 [main] ERROR o.a.k.s.p.i.StateDirectory - stream-thread [main] Failed to lock the state directory due to an unexpected exception
java
.nio.file.DirectoryNotEmptyException: tmp\{application.id}\0_4
 at sun
.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
 at sun
.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
 at java
.nio.file.Files.delete(Files.java:1126)
 at org
.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
 at org
.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
 at java
.nio.file.Files.walkFileTree(Files.java:2688)
 at java
.nio.file.Files.walkFileTree(Files.java:2742)
 at org
.apache.kafka.common.utils.Utils.delete(Utils.java:619)
 at org
.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
 at org
.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:912)
 at com
.bytro.firefly.stream.StreamService.startStreaming(StreamService.java:239)
 at com
.bytro.firefly.stream.StreamService.start(StreamService.java:129)
 at com
.bytro.firefly.Launcher.startStream(Launcher.java:61)
 at com
.bytro.firefly.Launcher.run(Launcher.java:55)
 at org
.springframework.boot.SpringApplication.callRunner(SpringApplication.java:732)
 at org
.springframework.boot.SpringApplication.callRunners(SpringApplication.java:716)
 at org
.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:703)
 at org
.springframework.boot.SpringApplication.run(SpringApplication.java:304)
 at org
.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
 at org
.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
 at com
.bytro.firefly.Launcher.main(Launcher.java:44)
 at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java
.lang.reflect.Method.invoke(Method.java:498)
 at org
.springframework.boot.maven.AbstractRunMojo$LaunchRunner.run(AbstractRunMojo.java:527)
 at java
.lang.Thread.run(Thread.java:745)


After stepping into the code, I encountered the main deleting logic in org.apache.kafka.streams.processor.internals.StateDirectory#cleanRemovedTasks. As far I understood, this is what is happening:
- The application gets a lock on tmp\{application.id}\0_4 through creating a .lock file inside the directory
- While traversing the directory recursively, all files except the .lock file get deleted
- tmp\{application.id}\0_4 is being deleted while the .lock file is still existing in the directory
- The aforementioned exception occurs
- The lock is released, the .lock file is deleted and the directory is remaining empty

Now I'm questioning myself: Shouldn't the lock be released before tmp\{application.id}\0_4 gets deleted in the end? Or am I missing something here?

Kind regards,
Christian Häckh
Reply all
Reply to author
Forward
0 new messages