KTable-KTable join for one-to-many relationship

1,057 views
Skip to first unread message

George @paytm.com

unread,
May 11, 2016, 3:56:45 PM5/11/16
to Confluent Platform
Hi,

I am wondering how KTable join handles one-to-many join.

Consider KTable A and B, B foreign keys to A and both have been partitioned by A's PK. To handle stream join, we need to buffer up both sides. What will the local storage of B look like? What kind of k-v pair will it use?

If someone can point me to the code implementation, that would be great. Thanks!

George


Guozhang Wang

unread,
May 11, 2016, 6:32:52 PM5/11/16
to Confluent Platform
Hi George,

This is good question, and we have seen similar ones in the past two weeks. I have filed https://issues.apache.org/jira/browse/KAFKA-3705 to summarize and track this issue, and I am trying to fix it asap.

As per implementation, you can take a look at KTableImpl:


which calls classes such as KTableKTableJoin / OuterJoin / LeftJoin / etc for joining.


Guozhang

George @paytm.com

unread,
May 11, 2016, 9:10:09 PM5/11/16
to Confluent Platform
Hi Guozhang,

Thanks for the reply. The code helps me understand implementation a lot better. We are looking forward to the Kafka Stream release

Here is our context/scenario for reference:
1. Our current one-to-one stream join is done in Samza.
2. Requirement comes and we need to consider one-to-many join. Luckily, our "many" on average has a size of 3-4, so we can afford storing (A, List<B>) in RocksDB.
3. Alternatively, we can construct a new key with value of A as prefix, and use RocksDB's indexed range scan so that we don't have to frequently update List<B>, but this approach is too complex for our specific use case.

George

Guozhang Wang

unread,
May 12, 2016, 6:33:30 PM5/12/16
to Confluent Platform
George,

Thanks for your usage scenarios. What I'm currently thinking about fixing KAFKA-3705 with a similar approach as to your 2) above, since the main concern for 3) is the efficiency of range query on RocksDB (I remember they allow some prefix range query, which can be more efficient, but again I do not have done any performance testing for that); nevertheless the idea is that we can allow users to call KTable-KTable join with foreign-key, and all the implementation details are abstracted from users, whether we choose to go with 2) or 3).

Guozhang
Reply all
Reply to author
Forward
0 new messages