I am trying to figure out how to loop over a set of items for every new record in a KStream. I can do this if I use a regular Java List:
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, GenericRecord> source = builder.stream("test-topic");
final List<Locations> locations = new ArrayList<Locations>();
// ...add locations to the List here...
source.foreach((key, value) -> locations.stream().forEach(location -> {
// ...do some stuff here using value and location...
}));
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
I can put the Locations into a locations-topic (it is a slowly changing set of records whereas the test-topic receives new records frequently), however I cannot use a KTable because the two topics cannot be co-partitioned. I really want to use a GlobalKTable because every new record in test-topic must be compared to every record of the locations-topic. Unfortunately, I cannot iterate over a GlobalKTable in a Kafka Streams topology - it can only be used on the right side of a join. A full outer join might also work, but the GlobalKTable does not support outer joins.
Does anyone know how I can accomplish this in a purely Kafka Streams implementation?