Kafka Streams - iterate GlobalKTable within a streaming topology

748 views
Skip to first unread message

Joe M

unread,
Jun 9, 2017, 8:08:27 AM6/9/17
to Confluent Platform
Hello,

I am trying to figure out how to loop over a set of items for every new record in a KStream.  I can do this if I use a regular Java List:

    public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

                final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, GenericRecord> source = builder.stream("test-topic");
        
final List<Locations> locations = new ArrayList<Locations>();
// ...add locations to the List here...
source.foreach((key, value) -> locations.stream().forEach(location -> {
// ...do some stuff here using value and location...
}));
        
                KafkaStreams streams = new KafkaStreams(builder, props);
                streams.start();
        
                // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

I can put the Locations into a locations-topic (it is a slowly changing set of records whereas the test-topic receives new records frequently), however I cannot use a KTable because the two topics cannot be co-partitioned.  I really want to use a GlobalKTable because every new record in test-topic must be compared to every record of the locations-topic.  Unfortunately, I cannot iterate over a GlobalKTable in a Kafka Streams topology - it can only be used on the right side of a join.  A full outer join might also work, but the GlobalKTable does not support outer joins.

Does anyone know how I can accomplish this in a purely Kafka Streams implementation?

Thanks,
Joe

Matthias J. Sax

unread,
Jun 9, 2017, 12:33:14 PM6/9/17
to confluent...@googlegroups.com
I would recommend to use Processor API for this (note, that you can
mix-and-match DSL and Processor API).

(1) You can use a global store to read all your location.

(2) You can access the global store from any other processor and thus,
you can iterate over the whole store for each input record.

Hope this helps.

-Matthias

On 6/9/17 5:08 AM, Joe M wrote:
> Hello,
>
> I am trying to figure out how to loop over a set of items for every new
> record in a KStream. I can do this if I use a regular Java List:
>
> public static void main(String[] args) throws Exception {
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://localhost:8081 <http://localhost:8081/>");
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/21a976e2-a0e2-4fe8-a872-de0d346d0413%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/21a976e2-a0e2-4fe8-a872-de0d346d0413%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Joe M

unread,
Jun 9, 2017, 12:49:38 PM6/9/17
to Confluent Platform
Hi Matthias,

Thank you for the suggestion and the quick response. I will give this approach a try.

Joe
Reply all
Reply to author
Forward
0 new messages