Kafka Streams left join - RocksDB cannot keep up?

449 views
Skip to first unread message

peter...@gmail.com

unread,
Sep 7, 2016, 10:02:49 AM9/7/16
to Confluent Platform

Hi,

I'm writing a simple POC application with Kafka Streams (0.10.0.1).
I have incoming records which I want to check against the local store (RocksDB), and if I see an
already received record, I would update it with some of the new fields. 
For this I'm left-joining a KStream to a KTable.

When I start a single application instance on localhost which reads the input topic then the KTable side of the 
left join will be null.  Thus I can't update records and the output topic will be the same as the input topic.

However, if producers are sending records at a lower rate to the input topic (E.g I wait between each published record few hundreds ms)
then I'll see the records in the local store and I'll be able to perform the join.

So it seems to me, as if the local RocksDB would not be able to keep up with the read throughput and would need more time to persist
the records.

Is there any option to tune this or could anyone give me please some suggestion how to deal with this issue?

Thanks,
Peter

Eno Thereska

unread,
Sep 7, 2016, 3:08:23 PM9/7/16
to Confluent Platform
Hi Peter,

Would you be willing to share some code? In particular, the configuration of your Kafka streams would be good to have, and ideally the code that constructs the KTable.

Thanks
Eno

peter...@gmail.com

unread,
Sep 7, 2016, 3:43:15 PM9/7/16
to Confluent Platform
Hi Eno,

Thanks for your answer.
I have the following code:

Properties props = new Properties();
props
.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props
.put(StreamsConfig.STATE_DIR_CONFIG, "/path/to/local_state_dir");
props
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props
.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zkHost);
props
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// MyTimestampExtractor just extracts a timestamp field from the Avro record
props
.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyTimestampExtractor.class.getName());
props
.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
props
.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

//This is the Serde from the Confluent example code
props
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

Serde<String> keySerde = Serdes.String();
Serde<GenericRecord> valueSerde = new GenericAvroSerde();
valueSerde
.configure(Collections.singletonMap(
   
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry), false);  

KStreamBuilder builder = new KStreamBuilder();

KStream<String, GenericRecord> inputRecordStream = builder.stream(keySerde, valueSerde, sourceTopic);
KTable<String, GenericRecord> localTable = builder.table(keySerde, valueSerde, targetTopic);

InputRecordStream.leftJoin(localTable, new ValueJoiner<GenericRecord, GenericRecord, GenericRecord>() {
   
@Override
   
public GenericRecord apply(GenericRecord streamValue, GenericRecord tableValue) {
     
//if tableValue == null return streamValue, otherwise merge them
     
GenericRecord joined = MyUtils.leftJoin(tableValue, streamValue);
     
return joined;
   
}
}).to(targetTopic);

KafkaStreams streams = new KafkaStreams(builder, props);
streams
.start();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
 
@Override
 
public void run() {
    streams
.close();
 
}
}));


I populate the sourceTopic from code by using the Producer API, key is a unique ID extracted from the GenericRecord which is the value.
Both the sourceTopic and targetTopic have 2 partitions with replication factor 1.

Thanks,
Peter

Michael Noll

unread,
Sep 8, 2016, 7:46:29 AM9/8/16
to confluent...@googlegroups.com
Peter,

could your problem be a timing/bootstrapping problem, i.e. you are sending input data to both topics (1x for KStream, 1x for KTable) around the same time?  It might be that, at the time a record (record key) in the KStream is being joined against the KTable, the data for the corresponding key in the KTable is not yet available.

To help verify that, you could do the following:

1. First send all the input data to the topic that backs the KTable.
2. Wait a few seconds.
3. Now send the input data to the topic that backs the KStream.

The three steps above ensure that the KTable is fully loaded prior to any joins being performed between the KStream and the KTable.

-Michael





--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/efe45606-fdd3-4d7e-a7c4-f462cb2f6f8e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Noll

unread,
Sep 8, 2016, 7:48:37 AM9/8/16
to confluent...@googlegroups.com
I noticed you are using the targetTopic twice:

1. The (input) KTable is read from "targetTopic".
2. The join results between the KStream and the KTable (which is read from targetTopic, see 1) are also written to the "targetTopic".

Are you sure this is correct/desired?



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
Message has been deleted
Message has been deleted
Message has been deleted

peter...@gmail.com

unread,
Sep 8, 2016, 11:46:54 AM9/8/16
to Confluent Platform

Hi Michael,

I guess, but I'm not sure. I'd like to mimic a DB here. 
For each record coming from "sourceTopic" I need to check the local store whether I've already encoundered a record
with the same ID. If yes, then I need to update the record which I retrieved from the local store, with some fields extracted from the incoming record from the sourceTopic.
The result of this join (merge) need to be written back to the local store.
I'll check your suggestion but in my case when I bootstrap the application the local store is empty.

This is what I see in the in the apply() method during joining when I wait 500ms between 2 record inserts to the sourceTopic: 

left: record_id1_v1
right:  null
-> insert "left" to targetTopic  

left: record_id1_v2
right: record_id1_v1
-> update record_id1_v1 with record_id1_v2 and write to targetTopic

Thus in the targetTopic I'll have:
record_id1_v1
record_id1_updated

Which is correct.

However, if the producer doesn't wait, I'll have:

left: record_id1_v1
right:  null
-> insert "left" to targetTopic  

left: record_id1_v2
right: null
-> insert "left" to targetTopic  

Thus in the targetTopic I'll have:
record_id1_v1
record_id1_v2


The RocksDBStore has a MemoryLRUCache so I'd think that fast subsequent writes with the same ID would not be a problem.
Does the code I sent you reflects what I try to achieve? Am I missing something? 

Thanks,
Peter
Reply all
Reply to author
Forward
0 new messages