public class KafkaStreamsMainClass {
public static void main(final String[] args) throws Exception {
[...]
KStreamBuilder builder = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(prop.getProperty("KAFKA_SOURCE_TOPIC"));
String schema = ...;
KStream<GenericRecord, GenericRecord> finishedTracesFiltered = sourceStream
.filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null)
.mapValues((GenericRecord value) -> {
// some mapping operations
return value;
});
KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream
.filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null)
.branch((GenericRecord key, GenericRecord value) -> value.get("field1") != null,
(GenericRecord key, GenericRecord value) -> value.get("field2") != null);
KStream<GenericRecord, GenericRecord> finishedRequests = finishedTracesFiltered.join(branchedStreams[0],
(GenericRecord value1, GenericRecord value2) -> {
// some operations
return value1;
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
KStream<GenericRecord, GenericRecord> finishedJobs = finishedTracesFiltered.join(branchedStreams[1],
(GenericRecord value1, GenericRecord value2) -> {
// some operations
return value1;
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
finishedRequests.to(prop.getProperty("KAFKA_TARGET_TOPIC"));
finishedJobs.to(prop.getProperty("KAFKA_TARGET_TOPIC"));
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
something-KSTREAM-JOINTHIS-0000000009-store-changelog
something-KSTREAM-JOINTHIS-0000000014-store-changelog
something-KSTREAM-JOINOTHER-0000000015-store-changelog
something-KSTREAM-JOINOTHER-0000000010-store-changelog> an email to confluent-platform+unsub...@googlegroups.com
> <mailto:confluent-platform+unsub...@googlegroups.com>.