Kafka Streams: Way to iterate session store as key-value store?

1,671 views
Skip to first unread message

akesle20...@gmail.com

unread,
May 3, 2017, 1:50:06 PM5/3/17
to Confluent Platform
Hello,


I appreciate any help on this.

I would like to iterate a local session store, that is set up by a Kafka Streams DSL topology definition, as a key-value store.

The short rationale is being able to traverse the local state store periodically by a non-Streams topology thread and evaluate timeouts for ongoing transactions.

I have attempted to open the session store from a KafkaStreams object as a key-value store, but the Kafka code prevents this usage with the following check:

    private abstract static class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
        private final Class matchTo;

        QueryableStoreTypeMatcher(Class matchTo) {
            this.matchTo = matchTo;
        }

        public boolean accepts(StateStore stateStore) {
            return this.matchTo.isAssignableFrom(stateStore.getClass());
        }
    }

Since, by definition, everything in RocksDB is a key-value pair, including the session information, it would make sense to be able to treat the sessions as key-value pairs.

So, is there another way of just traversing the session store kay-value tuples from RocksDB, using the Kafka Streams API, and not programming directly against RocksDB?

P.S. Right now, the session store implementation requires the user to know the keys of tuples that he or she wants to fetch and traverse. Whereas, I need to traverse all the tuples, without storing the keys someplace else. I am using Confluent 3.2.0/Kafka 0.10.2.0 libraries.


Thanks in advance!

- Alex

Eno Thereska

unread,
May 6, 2017, 10:20:43 AM5/6/17
to Confluent Platform
Hi Alex,


You can use the Interactive Queries API to get a handle on the session store, you don't need to create your own way of accessing it.

Eno

Alex Kesler

unread,
May 8, 2017, 10:30:10 AM5/8/17
to confluent...@googlegroups.com
Thanks, Eno. However, the code segment that you reference is exactly the issue that I pointed out.

If you look at the ReadOnlySessionStore interface, you would have to know ALL the keys in order to use the Interactive Query API:

public interface ReadOnlySessionStore<K, AGG> {

    KeyValueIterator<Windowed<K>, AGG> fetch(final K key);
}

In other words, I cannot just iterate all the current sessions, without inherent knowledge of the keys.

Instead, I was seeking an interface to the session information akin to ReadOnlyKeyValueStore:

public interface ReadOnlyKeyValueStore<K, V> {

    V get(K key);

   KeyValueIterator<K, V> range(K from, K to);

    KeyValueIterator<K, V> all();

    long approximateNumEntries();
}

Specifically, I was seeking the all() method above to get an iterator to all the current session-based key-value pairs for the ReadOnlySessionStore.

- Alex

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/cVfDlC1fDNI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/2965e21d-5135-4e45-b550-175544df4d0c%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Guozhang Wang

unread,
May 9, 2017, 6:01:43 PM5/9/17
to Confluent Platform
Hello Alex,

Before we consider adding a new API into ReadOnlySessionStore interface, I'd like to clarify your use case about "traverse the local state store periodically by a non-Streams topology thread and evaluate timeouts for ongoing transactions". My understanding is that:

Periodically you'd like to iterate over all the current open session windows, i.e. windows that are not closed yet, and then return a value as:


     ((Windowed<K>) windowedKey).window.startMs - System.currentMilliseconds()


Is that right? If yes, I agree that currently we do not have a direct way of retrieving all existing windows (i.e. not returning iterator of `Windowed<K>` key, but iterator of `Window`), and maybe it's better to add it to ReadOnlySession/WindowStore.

For now one walk around I can think of, which is a bit complicated, is to introduce a "dummy" start-transaction record from the source topic which you would always first put into the session window, so that now you know that all existing sessions will at least have this key in their window; of course it requires you to be able to control the input source topics.

Guozhang

To post to this group, send email to confluent...@googlegroups.com.

Eno Thereska

unread,
May 10, 2017, 3:55:43 AM5/10/17
to Confluent Platform
Alex, if the above suggestion from Guozhang doesn't work, it might be good to do a KIP on this to propose it to the community. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Eno
Reply all
Reply to author
Forward
0 new messages