Kafka Streams tasks and state stores

1,451 views
Skip to first unread message

Tianxiang Xiong

unread,
Dec 15, 2016, 7:24:46 PM12/15/16
to Confluent Platform
How do state stores interact with Kafka Stream tasks?

Here's a contrived example:

Suppose I wanted to track the number of clicks per user. There is a KStream whose records are of the form {user-id : num-clicks}, partitioned on user-id. We could use countByKey, but for the sake of argument let's use process instead, and update a state store in the processor.

We create a state store provider and register it to a KStreamBuilder with:

StateStoreSupplier countStore = Stores.create("Counts")
             
.withKeys(Serdes.String())
             
.withValues(Serdes.Long())
             
.persistent()
             
.build();

builder
.addStateStore(countStore);

If there are n stream tasks, does each task get an independent state store?

If so, two questions via examples:

Let's say there are 3 tasks, t1, t2, t3 to which user IDs are partitioned, e.g. users 11, 12 to t1, 21, 22 to t2, 31, 32 to t3. Then t1 have the counts for 11, 12, but not 21, 22, 31, or 32. t2 would have the counts for 21, 22, etc.
  • Suppose we have some front-end that displays the clicks per user. We'd need to consume from all state stores for all tasks. What's the best way to do that? 
    • Is there a code example?
  • What if, while counting clicks for user 11, we wanted to compare their clicks with those of user 21? Since their clicks are counted in different tasks, the state store for t1 won't have the right click counts for user 21. 
    • In other words, is there a way we can share global state across tasks, with the ability to update that state from any task in such a way that other tasks will see that updated state before processing their next records?

Eno Thereska

unread,
Dec 16, 2016, 4:03:19 AM12/16/16
to Confluent Platform
Hi there,

Yes, each task gets it's own state store. The best way to query the state stores directly is through the Interactive Queries API. There is a blog here (with example code in the end):

Interactive queries is able to query across tasks and even across instances (on different machines).

Thanks
Eno

Maria Abramiuc

unread,
Jan 5, 2018, 3:38:05 AM1/5/18
to Confluent Platform
Hi,

I have one additional question to your answer. I know that the user can configure the number of Stream Threads that the topology starts, and the number of tasks is given by the assignment from the server. Is there any way to configure having just one task per thread, so the
application would have just one store and access to all the data that the thread processes?

Is there a better way to access all the stores from the current application (host), as far as I understand the Queries API is best suited when you want to query all instances or should it be used for just one instance as well?

There is this another option:
KafkaStreams streaming = new KafkaStreams(...);
streaming.start();

CompositeReadOnlyKeyValueStore keyValueStore =   (CompositeReadOnlyKeyValueStore)  streaming.store("mystore", QueryableStoreTypes.keyValueStore());
KeyValueIterator it = keyValueStore.all();

Thank you,
Maria Abramiuc

Saïd Bouras

unread,
Jan 5, 2018, 1:43:27 PM1/5/18
to confluent...@googlegroups.com
Hello, 

You can configure the number of threads per KafkaStreams app but the number of tasks will be set by the higher number of your input topics partitions.
Let's say your app read three topics A, B and C with accordingly 3, 7, 5 partitions each, so you will have 7 tasks dispatched among X threads (X value decided by you).

Hope that helps, regards

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/20c5f253-e238-4469-a4d2-1d3316f6fee1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--

Saïd BOURAS

Data Engineer

Mobile: 0662988731 

Zenika Paris
10 rue de Milan 75009 Paris
Standard : +33(0)1 45 26 19 15 - Fax : +33(0)1 72 70 45 10 
  

Matthias J. Sax

unread,
Jan 5, 2018, 4:22:21 PM1/5/18
to confluent...@googlegroups.com
Not sure if I can follow.

If you want to have a single task per thread, than you can configure as
many thread as task are created. Can't you? Of course, you need to know
how many task you are going to get and that depends on the number of
partitions. I doubt, you can to a "trial run" and observe the logs --
the created tasks are logged and thus you can figure it out.

For IQ: you can of course use IQ to access just the local data, too.


-Matthias
> <https://www.google.com/url?q=https%3A%2F%2Fwww.confluent.io%2Fblog%2Funifying-stream-processing-and-interactive-queries-in-apache-kafka%2F&sa=D&sntz=1&usg=AFQjCNHYyJALEPZWpfUEvU9j7g7xKLJB4g>
>
> Interactive queries is able to query across tasks and even across
> instances (on different machines).
>
> Thanks
> Eno
>
> On Friday, 16 December 2016 00:24:46 UTC, Tianxiang Xiong wrote:
>
> How do state stores interact with Kafka Stream tasks
> <http://docs.confluent.io/3.0.1/streams/architecture.html#stream-partitions-and-tasks>?
>
> Here's a contrived example:
>
> Suppose I wanted to track the number of clicks per user. There
> is a /KStream/ whose records are of the form {user-id :
> num-clicks}, partitioned on user-id. We could use /countByKey/
> <https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#countByKey(org.apache.kafka.common.serialization.Serde,%20java.lang.String)>,
> but for the sake of argument let's use /process/
> <https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#process(org.apache.kafka.streams.processor.ProcessorSupplier,%20java.lang.String...)> instead,
> and update a state store in the processor.
>
> We create a state store provider and register it to a
> /KStreamBuilder/ with:
>
> |
> StateStoreSuppliercountStore =Stores.create("Counts")
>               .withKeys(Serdes.String())
>               .withValues(Serdes.Long())
>               .persistent()
>               .build();
>
> builder.addStateStore(countStore);
> |
>
> If there are /n/ stream tasks, does each task get an independent
> state store?
>
> If so, two questions via examples:
>
> Let's say there are 3 tasks, /t1, t2, t3 /to which user IDs are
> partitioned, e.g. users 11, 12 to /t1/, 21, 22 to /t2/, 31, 32
> to /t3/. Then /t1/ have the counts for 11, 12, but not 21, 22,
> 31, or 32. /t2 /would have the counts for 21, 22, etc.
>
> * Suppose we have some front-end that displays the clicks per
> user. We'd need to consume from all state stores for all
> tasks. What's the best way to do that? 
> o Is there a code example?
> * What if, while counting clicks for user 11, we wanted to
> compare their clicks with those of user 21? Since their
> clicks are counted in different tasks, the state store for
> /t1/ won't have the right click counts for user 21. 
> o In other words, is there a way we can share /global
> state/ across tasks, with the ability to update that
> state from any task in such a way that other tasks will
> see that updated state before processing their next records?
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/20c5f253-e238-4469-a4d2-1d3316f6fee1%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Maria Abramiuc

unread,
Jan 10, 2018, 7:16:35 AM1/10/18
to Confluent Platform
I just wanted to configure the number of tasks to 1 so I get just one instance of the state store (since it's one instance per task), but I will go with accessing all instances of the store using the IQ API.
Thank you,
Maria

Matthias J. Sax

unread,
Jan 11, 2018, 2:58:28 AM1/11/18
to confluent...@googlegroups.com
If you want to have a single task, you need to use a topic with a single
partitions (it's not configurable).

An alternative might be, to use a globalStore/GlobalKTable instead of a
regular store/table -- but this gives you a full copy of the state on
_each_ thread, thus it might not be want you want.


-Matthias
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> >    
> <https://www.google.com/url?q=https%3A%2F%2Fwww.confluent.io%2Fblog%2Funifying-stream-processing-and-interactive-queries-in-apache-kafka%2F&sa=D&sntz=1&usg=AFQjCNHYyJALEPZWpfUEvU9j7g7xKLJB4g
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/20c5f253-e238-4469-a4d2-1d3316f6fee1%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/28bbddff-9e5f-4afb-a16a-5a62e5cf741e%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/28bbddff-9e5f-4afb-a16a-5a62e5cf741e%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages