RocksDB error with KStream

2,040 views
Skip to first unread message

Subhra Prakash Banerjee

unread,
Nov 8, 2017, 8:05:38 PM11/8/17
to Confluent Platform
Hi all,

I have been struggling with this issue for past couple of days, and couldn't find much in any forum. This is my issue. 

Kafka Streams version = 0.10.2.1

I have defined one POJO class 'Record' and corresponding 'recordSerde'.
Record has a field called 'count'. And it also has a method 'reduce' that accumulates the count.
This is how I am defining a KTable.

long windowTimeInterval = TimeUnit.MINUTES.toMillis(1);
KTable<Windowed<String>, Record> recordKTable = recordStream
.groupBy((k, record) -> record.getId(), stringSerde, recordSerde)
.reduce(Record::reduce, TimeWindows.of(windowTimeInterval), "myStore");


I am getting these errors while reading from the  stream. 

java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni2084489902535366388.so: libstdc++.so.6: cannot open shared object file: No such file or directory
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)

.
.
.

[2017-11-09 00:50:30,408] ERROR [StreamThread-3] Could not initialize class org.rocksdb.Options
java.lang.NoClassDefFoundError: Could not initialize class org.rocksdb.Options
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:117)
at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38)
at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:76)

Any insight would be appreciated


Thanks
Subhra

Matthias J. Sax

unread,
Nov 9, 2017, 5:19:31 AM11/9/17
to confluent...@googlegroups.com
This is a RocksDB issue. What platform do you run on? We are aware of
some problems on non Linux platforms.

Maybe you can try to upgrade to 0.11.0.1 or 1.0.0 -- both contain
critical bug fixes and major improvements. You don't need to upgrade
your brokers for this.

-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/ae4d5294-a34b-4b26-b5f5-06ac8d959b25%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/ae4d5294-a34b-4b26-b5f5-06ac8d959b25%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Message has been deleted

Subhra Prakash Banerjee

unread,
Nov 9, 2017, 3:05:08 PM11/9/17
to Confluent Platform
Hi Matthias,

Thank you for your response.

I upgraded to 1.0.0, with a new application id. Still seeing the exact same RocksDB error. We are running the KStream app on kubernetes environment. 

The stream starts and reads the data from the source topic successfully, aggregates and sends it successfully to destination topic. But then it throws this error, and the stream crashes. And when the application restarts, the stream publishes the same messages again and again to the destination topic. (I am guessing the offsets are not committed for the internal topics).

Since you mentioned it is RocksDB issue, is there any way or examples to do either of the following?
1) Use an in memory 'Windowed' key value store. (I think the persistence store always use RocksDB).
2) Instead of Windowed store, use an in memory store along with low level ValueTransformer class. So that we can use the punctuate method for tumbling windowing. 

Thanks
Subhra


Hyun Joon Seol

unread,
Nov 9, 2017, 3:44:33 PM11/9/17
to Confluent Platform
We don't use windowed but we do use in memory:
val storeSupplier =
Stores
.create("foo")
.withKeys(Serdes.keySerde)
.withValues(Serdes.valueSerde)
.inMemory()
.enableLogging(Map[String, String]().asJava)
.build()
.asInstanceOf[StateStoreSupplier[KeyValueStore[_,_]]]
Supply this to the method that accepts store suppliers. I think the API is a bit different for 1.0.0 though.

Subhra Prakash Banerjee

unread,
Nov 9, 2017, 8:40:45 PM11/9/17
to Confluent Platform
Hi all,

I got my code working using low level API's and in memory key store. Used punctuate method for windowing.

Thanks
Subhra

Matthias J. Sax

unread,
Nov 10, 2017, 4:59:49 PM11/10/17
to confluent...@googlegroups.com
Glad it works!

Btw: with 1.0.0 is also pretty easy to replace a store with an in-memory
store in DSL. We will have detailed docs after CP 4.0.0 is released. For
now, you can look into the original design doc:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/22db026c-bed2-433b-9e4c-597eda9dd1d0%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/22db026c-bed2-433b-9e4c-597eda9dd1d0%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Vishnu Viswanath

unread,
Jul 27, 2018, 2:13:50 PM7/27/18
to Confluent Platform
Hi, 

I also faced similar issue yesterday while running Kafka streams application with a "aggregate" function in it.  Running via docker (alpine linux), in the Dockerfile I did create a temp directory with +w permission and pass it as "-Djava.io.tmpdir". If I login into the instance, I could see that librocksdbjni2084489902535366388.so file is created in the tmpdir, but application still fails with unsatisfied link error. 

Could solve it by changing to in memory store, but is skeptical if this will be an issue later if the aggregate function uses lot of memory. Is there any other suggestion on solving this problem (i.e., use rocksdb as state store and run within a docker container)

Thanks in advance,
Vishnu


On Friday, November 10, 2017 at 1:59:49 PM UTC-8, Matthias J. Sax wrote:
Glad it works!

Btw: with 1.0.0 is also pretty easy to replace a store with an in-memory
store in DSL. We will have detailed docs after CP 4.0.0 is released. For
now, you can look into the original design doc:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines


-Matthias

On 11/9/17 5:40 PM, Subhra Prakash Banerjee wrote:
> Hi all,
>
> I got my code working using low level API's and in memory key store.
> Used punctuate method for windowing.
>
> Thanks
> Subhra
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
Reply all
Reply to author
Forward
0 new messages