Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/path/to/local_state_dir");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkHost);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// MyTimestampExtractor just extracts a timestamp field from the Avro record
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class.getName());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//This is the Serde from the Confluent example code
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
Serde<String> keySerde = Serdes.String();
Serde<GenericRecord> valueSerde = new GenericAvroSerde();
valueSerde.configure(Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry), false);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, GenericRecord> inputRecordStream = builder.stream(keySerde, valueSerde, sourceTopic);
KTable<String, GenericRecord> localTable = builder.table(keySerde, valueSerde, targetTopic);
InputRecordStream.leftJoin(localTable, new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord streamValue, GenericRecord tableValue) {
//if tableValue == null return streamValue, otherwise merge them
GenericRecord joined = MyUtils.leftJoin(tableValue, streamValue);
return joined;
}
}).to(targetTopic);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/efe45606-fdd3-4d7e-a7c4-f462cb2f6f8e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/3405f0bf-a464-42cd-bf16-c991cc71319d%40googlegroups.com.