Query a global store inside KStream

65 views
Skip to first unread message

Thomas Autret

unread,
Oct 27, 2017, 6:40:23 AM10/27/17
to Confluent Platform
Hi,

Context
I have a scenario with two input topics "FilesTopic" (3 partitions) and "DataEnrichmentTopic" (1 partition). The last topic is updated every hour with a simple String value.

What I need to do
Every file received in FilesTopic must be enriched with the last value available in DataEnrichmentTopic.
In my thoughts, I need a global table/store to share values from DataEnrichmentTopic with all partitions of FilesTopic.
But I want to enrich files without any join method because I have no matching key.

I tried to implement my scenario with KStreamBuilder using globalTable method like this :
KStreamBuilder builder = new KStreamBuilder();
filesStream = builder.stream("FilesTopic");
builder.globalTable("DataEnrichmentTopic", "data-enrichment-store");

Then I need to query my global table/store inside my filesStream but I don't know how to do that...
filesStream.mapValues(file -> /* enrich file input by calling the table/store : file + last value from data-enrichment-store */)

Is it the right way to implement this scenario or I need to use a TopologyBuilder ?

Thank you for your help :)

Thomas

Matthias J. Sax

unread,
Oct 27, 2017, 7:01:39 AM10/27/17
to confluent...@googlegroups.com
It seems to be easier, to register a globalStore and use .transform().

-Matthias

On 10/27/17 12:40 PM, Thomas Autret wrote:
> Hi,
>
> _Context_
> I have a scenario with two input topics "FilesTopic" (3 partitions) and
> "DataEnrichmentTopic" (1 partition). The last topic is updated every
> hour with a simple String value.
>
> _What I need to do_
> Every file received in FilesTopic must be enriched with the last value
> available in DataEnrichmentTopic.
> In my thoughts, I need a global table/store to share values from
> DataEnrichmentTopic with all partitions of FilesTopic.
> But I want to enrich files *without* any join method because I have no
> matching key.
>
> I tried to implement my scenario with KStreamBuilder using globalTable
> method like this :
> KStreamBuilder builder = new KStreamBuilder();
> filesStream = builder.stream("FilesTopic");
> builder.globalTable("DataEnrichmentTopic", "data-enrichment-store");
>
> Then I need to query my global table/store inside my filesStream but I
> don't know how to do that...
> filesStream.mapValues(file -> /* enrich file input by calling the
> table/store : file + last value from data-enrichment-store */)
>
> Is it the right way to implement this scenario or I need to use a
> TopologyBuilder ?
>
> Thank you for your help :)
>
> Thomas
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Thomas Autret

unread,
Oct 27, 2017, 8:42:51 AM10/27/17
to Confluent Platform
I understand how to use the transformer, I can retrieve the store from ProcessorContext then read it and enriches my input files.

But I don't understand some parameters of addGlobalStore method :
- what is the value for sourceName and processorName parameters ?
- the processorSupplier parameter allows to update manually the global store inside process method ?

Thanks.

Thomas

Thomas Autret

unread,
Oct 27, 2017, 9:25:48 AM10/27/17
to Confluent Platform
It seems not working using addGlobalStore method and transform.
I have an exception "StateStore xxx is not added yet".

When calling addGlobalStore the store is put in globalStateStores map but when calling transform we're looking for the store in stateFactories map.

Damian Guy

unread,
Oct 27, 2017, 9:49:13 AM10/27/17
to confluent...@googlegroups.com
You should be able to just look up the global statestore inside your transformer, i.e., via ProcessorContext#getStateStore


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com.

Thomas Autret

unread,
Oct 27, 2017, 10:29:10 AM10/27/17
to Confluent Platform
Maybe but I have an exception during initialization. But I think getStateStore retrieves just local stores not global.
I saw a GlobalProcessorContext whose getStateStore method calls stateManager.getGlobalStore, but I don't know how to access it.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Oct 27, 2017, 11:05:57 AM10/27/17
to confluent...@googlegroups.com
When you call `kstream#transform(...)` don't supply the global store name. It isn't required as it is global all nodes have acces to it, that is likely the cause of the exception.
`ProcessorContex#getStateStore` will find the global store if it exists

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Matthias J. Sax

unread,
Oct 27, 2017, 11:42:06 AM10/27/17
to confluent...@googlegroups.com
To answer you other questions:

> - what is the value for sourceName and processorName parameters ?

Just provide some unique names. addGlobalStore() is a Processor API
method and requires to name all processors (adding a global store adds
two processors and you just need to name both). Usually, at DSL level we
auto-generate processor names -- this might actually be a nice
improvement to generate them if addGlobalStore is used in the DSL. I did
create https://issues.apache.org/jira/browse/KAFKA-6138 for this.

> - the processorSupplier parameter allows to update manually the global store inside process method ?

Yes. The processor supplier is responsible to update the global store.



-Matthias


On 10/27/17 5:05 PM, Damian Guy wrote:
> When you call `kstream#transform(...)` don't supply the global store
> name. It isn't required as it is global all nodes have acces to it, that
> is likely the cause of the exception.
> `ProcessorContex#getStateStore` will find the global store if it exists
>
> On Fri, 27 Oct 2017 at 15:29 Thomas Autret <thomas...@gmail.com
> <mailto:thomas...@gmail.com>> wrote:
>
> Maybe but I have an exception during initialization. But I think
> getStateStore retrieves just local stores not global.
> I saw a GlobalProcessorContext whose getStateStore method calls
> stateManager.getGlobalStore, but I don't know how to access it.
>
>
>
>
> Le vendredi 27 octobre 2017 15:49:13 UTC+2, Damian Guy a écrit :
>
> You should be able to just look up the global statestore inside
> your transformer, i.e., via ProcessorContext#getStateStore
>
>
> On Fri, 27 Oct 2017 at 14:25 Thomas Autret <thomas...@gmail.com>
> wrote:
>
> It seems not working using addGlobalStore method and transform.
> I have an exception "StateStore xxx is not added yet".
>
> When calling addGlobalStore the store is put in
> *globalStateStores map* but when calling transform we're
> looking for the store in *stateFactories map*.
>
> Thomas
>
>
>
>
>
>
>
>
> Le vendredi 27 octobre 2017 14:42:51 UTC+2, Thomas Autret a
> écrit :
>
> I understand how to use the transformer, I can retrieve
> the store from ProcessorContext then read it and
> enriches my input files.
>
> But I don't understand some parameters of addGlobalStore
> method :
> - what is the value for *sourceName* and *processorName*
> parameters ?
> - the *processorSupplier* parameter allows to update
> <https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it,
> send an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to
> confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAJikTEWRhJhhtX_kZHGRPuszJHc4VNcr159Aoq1TCq9bjdQgzQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/confluent-platform/CAJikTEWRhJhhtX_kZHGRPuszJHc4VNcr159Aoq1TCq9bjdQgzQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.
signature.asc

Thomas Autret

unread,
Oct 29, 2017, 5:39:38 AM10/29/17
to Confluent Platform
Thanks Matthias and Damian.

Using transform() without supplying global store name works fine :)

Thomas
>                     confluent-platform+unsub...@googlegroups.com
>                     >
>                     <mailto:confluent-platform+unsub...@googlegroups.com>.
>
>                     > To post to this group, send email to
>                     confluent...@googlegroups.com
>                     > <mailto:confluent...@googlegroups.com>.
>                     > To view this discussion on the web visit
>                     >
>                     https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com
>
>                     >
>                     <https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
>                     > For more options, visit
>                     https://groups.google.com/d/optout.
>
>             --
>             You received this message because you are subscribed to the
>             Google Groups "Confluent Platform" group.
>             To unsubscribe from this group and stop receiving emails
>             from it, send an email to
>
>             To post to this group, send email to
>             confluent...@googlegroups.com.
>
>
>             To view this discussion on the web visit
>             https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com
>             <https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com?utm_medium=email&utm_source=footer>.
>             For more options, visit https://groups.google.com/d/optout.
>
>     --
>     You received this message because you are subscribed to the Google
>     Groups "Confluent Platform" group.
>     To unsubscribe from this group and stop receiving emails from it,
>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.
>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Thomas Autret

unread,
Oct 30, 2017, 5:46:04 AM10/30/17
to Confluent Platform
For KAFKA-6138, what do you think of adding a ProcessorSupplier parameter in private method KStreamBuilder#doGlobalTable ?
In this method, processorName and sourceName are already generated.

Thomas
>                     confluent-platform+unsub...@googlegroups.com
>                     >
>                     <mailto:confluent-platform+unsub...@googlegroups.com>.
>
>                     > To post to this group, send email to
>                     confluent...@googlegroups.com
>                     > <mailto:confluent...@googlegroups.com>.
>                     > To view this discussion on the web visit
>                     >
>                     https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com
>
>                     >
>                     <https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
>                     > For more options, visit
>                     https://groups.google.com/d/optout.
>
>             --
>             You received this message because you are subscribed to the
>             Google Groups "Confluent Platform" group.
>             To unsubscribe from this group and stop receiving emails
>             from it, send an email to
>
>             To post to this group, send email to
>             confluent...@googlegroups.com.
>
>
>             To view this discussion on the web visit
>             https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com
>             <https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com?utm_medium=email&utm_source=footer>.
>             For more options, visit https://groups.google.com/d/optout.
>
>     --
>     You received this message because you are subscribed to the Google
>     Groups "Confluent Platform" group.
>     To unsubscribe from this group and stop receiving emails from it,
>     To post to this group, send email to
>     confluent...@googlegroups.com
>     <mailto:confluent...@googlegroups.com>.
>     To view this discussion on the web visit
>     https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com?utm_medium=email&utm_source=footer>.
>     For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Matthias J. Sax

unread,
Oct 30, 2017, 7:55:38 PM10/30/17
to confluent...@googlegroups.com
KStreamBuilder is deprecated in upcoming 1.0 release. No need to update
anything there.

-Matthias
> <javascript:>
> confluent-platf...@googlegroups.com <javascript:>
> >                     >
> >                    
> <mailto:confluent-platf...@googlegroups.com <javascript:>>.
> >
> >                     > To post to this group, send email to
> >                     confluent...@googlegroups.com
> >                     > <mailto:confluent...@googlegroups.com>.
> >                     > To view this discussion on the web visit
> >                     >
> >                    
> https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com>
>
> >
> >                     >
> >                    
> <https://groups.google.com/d/msgid/confluent-platform/29822e36-9509-4996-812b-c3b7ceecc291%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
> >
> >             --
> >             You received this message because you are subscribed
> to the
> >             Google Groups "Confluent Platform" group.
> >             To unsubscribe from this group and stop receiving emails
> >             from it, send an email to
> >             confluent-platf...@googlegroups.com
> <javascript:>.
> >
> >             To post to this group, send email to
> <https://groups.google.com/d/msgid/confluent-platform/7a087e9c-f2a0-4a51-8c39-b52d3e124e72%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
> >
> >     --
> >     You received this message because you are subscribed to the
> Google
> >     Groups "Confluent Platform" group.
> >     To unsubscribe from this group and stop receiving emails from it,
> >     send an email to
> confluent-platf...@googlegroups.com <javascript:>
> >     <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> >     To post to this group, send email to
> >     confluent...@googlegroups.com <javascript:>
> >     <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/21f631c8-9375-4312-94c2-489b13630ea7%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
> >
> > --
> > You received this message because you are subscribed to the Google
> > Groups "Confluent Platform" group.
> > To unsubscribe from this group and stop receiving emails from it,
> send
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/CAJikTEWRhJhhtX_kZHGRPuszJHc4VNcr159Aoq1TCq9bjdQgzQ%40mail.gmail.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/5091198c-8da2-40e2-94e0-0a5be128038a%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/5091198c-8da2-40e2-94e0-0a5be128038a%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages