Thoughts on supporting sophisticated pull queries: SQL to KV mapping

28 views
Skip to first unread message

Tim Fox

unread,
Nov 23, 2019, 4:00:36 AM11/23/19
to ksql-dev

Hello folks,


As we start to look at implementing more complex pull queries than simple point lookups, I had some thoughts how we might do this. (I alluded to some of this on the ksql-dev list yesterday but wanted to post here as more sticky!)


Right now we use RocksDB as our state store. RocksDB is a KV store - and the way we use it currently is to store the message in the value. This works well for key lookups but for doing more complex WHERE clauses filtering on non key columns it becomes problematic - we would have to scan the entire table and deserialise each row to do the filtering operation - this would be very inefficient.


One solution to this would be to create new RocksDB databases to provide secondary indexes over the same message data, but this approach results in a proliferation of databases, and also creates consistency issues between updates to data and indexes.


A different approach would be to provide a mapping between SQL data and the KV store, such that any column value in a SQL table can be mapped to a unique key in the KV store. This approach is not new, and I won’t go into details on it. Please see links at bottom for detailed explanations.


To summarise the approach:


Any value in a SQL table can be mapped to a key such that:

key = table_name$primary_key_value$column_name

the value for that key would be the actual value of the data.


To lookup up a row with a particular primary key you ask RocksDB to retrieve all rows with key prefix “tablename$primary_key_value” (RocksDB supports prefix lookups). This will return one row for each column in the table.


Secondary indexes can also be created in a similar way by creating further keys, e.g.

Key = index_name$indexed_column_value$primary_key_value


TLDR, using this approach would allow us to store all of our SQL tables and indexes in a single RocksDB database. It would allow us to efficiently implement complex WHERE clauses with minimal serialisation overhead. It will also enable us to continue using RocksDB as our underlying storage engine - writing an efficient storage layer is no mean feat!


In order to use state stores from Kafka Streams such that our data is written in this new way, we would have to use the KS processor API instead of the KS DSL, and provide our own state store that knows how to write in this way, and we'd have to provide our own aggregator implementation. We'd also have to write the query engine to map the query to the to the KV lookups and do any joins, so it's a large piece of work, but considerably more tractable than writing a DB from the ground up. But, if we pull this off it will really strengthen ksqlDB's offering as a streaming database.


-----


This article has a pretty good explanation of mapping SQL data to KV data  https://medium.com/@wishmithasmendis/from-rdbms-to-key-value-store-data-modeling-techniques-a2874906bc46


CockroachDB uses a similar approach to map SQL structures to KV stores (it also uses RocksDB as it’s underlying storage)  https://www.cockroachlabs.com/blog/sql-in-cockroachdb-mapping-table-data-to-key-value-storage/ and https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/encoding.md


Tim Fox

unread,
Nov 23, 2019, 4:20:21 AM11/23/19
to ksql-dev
Sorry for the weird whitespace! I'm pretty sure I didn't type it that way :(

Vinoth Chandar

unread,
Nov 25, 2019, 10:19:46 AM11/25/19
to Tim Fox, ksql-dev
Hi Tim,

thanks for starting this topic! 

we would have to scan the entire table and deserialise each row to do the filtering operation - this would be very inefficient.
+1 IMO we should not support full table scans out-of-box for these precise reasons. It can lead to thrashing other traffic including the I/O needed for materializing the pull queries themselves.
We were looking at a similar approach for indexing by time. and also drawing some modelling ideas from tsdb http://opentsdb.net/docs/build/html/user_guide/backends/hbase.html 

Few considerations.  
- This makes a simple key fetch also a range scan, which voids the use of bloom filters for lookup IIUC. This may be undesirable, need to understand scan performance more closely
- Our metastore that hands out the tableID and indexID needs to be consistent (I think with the recent work into command topic produce, we are there?) across ksql servers. (for the metrics use-case, this was a problem)
- Note that the secondary index thus produced is a "local" secondary index. To get all value for say "age=20" we need issue the same scan lookup against all servers and scatter-gather the results. Do we also need to think about a "global" secondary index, which may be only eventually consistent? 
- We can use something like https://github.com/facebook/rocksdb/wiki/Transactions to make the updates to primary and secondary key spaces transactional in a given state store. But, if you consider ksqlDB as a distributed database, the servers may always be inconsistent with each other anyway.. We don't have a synchronous write layer that cockroachDB may have. So. what are the global consistency guarantees for a ksql table? 
- We need to check if the streams replication/recovery work without much changes, even using custom state store/data model.. Otherwise, it's akin to writing our own database. I would rather start fresh :)  


Ultimately, this is a large effort and we need to prioritize this against other items on the roadmap. 

/thanks/vinoth





--
You received this message because you are subscribed to the Google Groups "ksql-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-dev+u...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/ksql-dev/aca36ba1-eb9e-4287-a165-5ab5e31b69e4%40googlegroups.com.

Tim Fox

unread,
Nov 25, 2019, 11:40:23 AM11/25/19
to ksql-dev


Few considerations.  
- This makes a simple key fetch also a range scan, which voids the use of bloom filters for lookup IIUC. This may be undesirable, need to understand scan performance more closely

Indeed, keys with a particular prefix may be in different levels which would lead to read amplification. However RocksDB has the concept of "prefix bloom filters" to help us here. (See https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide and https://www.cockroachlabs.com/blog/cockroachdb-on-rocksd/ (search on page for prefix bloom filter).
 
- Our metastore that hands out the tableID and indexID needs to be consistent (I think with the recent work into command topic produce, we are there?) across ksql servers. (for the metrics use-case, this was a problem)
- Note that the secondary index thus produced is a "local" secondary index. To get all value for say "age=20" we need issue the same scan lookup against all servers and scatter-gather the results. Do we also need to think about a "global" secondary index, which may be only eventually consistent? 

I don't think we can avoid scatter gather in all cases, but in some we can by partitioning the data in the right way. I mentioned this on the other recent thread on this group:  

>> E.g. imagine there is a shopping basket BASKET_ITEMS table and the user wants to execute a pull query "SELECT * FROM BASKET_ITEMS WHERE USER_ID=?" to retrieve their current basket.The primary key of that table would be a composite (USER_ID, ITEM_ID). It would make sense to partition that table such that all items for a particular user are on the same partition (i.e. partition by USER_ID). That way we only need to execute the query on one node to get the correct results

 
- We can use something like https://github.com/facebook/rocksdb/wiki/Transactions to make the updates to primary and secondary key spaces transactional in a given state store.
 

Yes, we would need to user RocksDB transactions.

 
But, if you consider ksqlDB as a distributed database, the servers may always be inconsistent with each other anyway.. We don't have a synchronous write layer that cockroachDB may have. So. what are the global consistency guarantees for a ksql table? 


For pull queries - I think in many cases the user will just want the current state of the data (whatever that is), so it won't matter. In some cases, though it might matter - e.g. say they've receive an event on a topic and that has triggered them to do a pull query to retrieve some data that is derived from the processing of that event. If they attempt the pull query too early, the data might not be there. We could solve this with some kind of version number in the event. When the pull query is executed this version number is provided, and if the table hasn't seen this version yet it can delay the results until it does see it.

For push queries it also matters less, eventually the data will arrive and be pushed to the user.

 
- We need to check if the streams replication/recovery work without much changes, even using custom state store/data model.. Otherwise, it's akin to writing our own database. I would rather start fresh :)  


Ultimately, this is a large effort and we need to prioritize this against other items on the roadmap. 


Absolutely, it's a big piece of work, but I think it's going to be hard to avoid, in the long term anyway if we want a solid pull query implementation.

 

/thanks/vinoth





On Sat, Nov 23, 2019 at 1:20 AM Tim Fox <tf...@confluent.io> wrote:
Sorry for the weird whitespace! I'm pretty sure I didn't type it that way :(

On Saturday, November 23, 2019 at 9:00:36 AM UTC, Tim Fox wrote:

Hello folks,


As we start to look at implementing more complex pull queries than simple point lookups, I had some thoughts how we might do this. (I alluded to some of this on the ksql-dev list yesterday but wanted to post here as more sticky!)


Right now we use RocksDB as our state store. RocksDB is a KV store - and the way we use it currently is to store the message in the value. This works well for key lookups but for doing more complex WHERE clauses filtering on non key columns it becomes problematic - we would have to scan the entire table and deserialise each row to do the filtering operation - this would be very inefficient.


One solution to this would be to create new RocksDB databases to provide secondary indexes over the same message data, but this approach results in a proliferation of databases, and also creates consistency issues between updates to data and indexes.


A different approach would be to provide a mapping between SQL data and the KV store, such that any column value in a SQL table can be mapped to a unique key in the KV store. This approach is not new, and I won’t go into details on it. Please see links at bottom for detailed explanations.


To summarise the approach:


Any value in a SQL table can be mapped to a key such that:

key = table_name$primary_key_value$column_name

the value for that key would be the actual value of the data.


To lookup up a row with a particular primary key you ask RocksDB to retrieve all rows with key prefix “tablename$primary_key_value” (RocksDB supports prefix lookups). This will return one row for each column in the table.


Secondary indexes can also be created in a similar way by creating further keys, e.g.

Key = index_name$indexed_column_value$primary_key_value


TLDR, using this approach would allow us to store all of our SQL tables and indexes in a single RocksDB database. It would allow us to efficiently implement complex WHERE clauses with minimal serialisation overhead. It will also enable us to continue using RocksDB as our underlying storage engine - writing an efficient storage layer is no mean feat!


In order to use state stores from Kafka Streams such that our data is written in this new way, we would have to use the KS processor API instead of the KS DSL, and provide our own state store that knows how to write in this way, and we'd have to provide our own aggregator implementation. We'd also have to write the query engine to map the query to the to the KV lookups and do any joins, so it's a large piece of work, but considerably more tractable than writing a DB from the ground up. But, if we pull this off it will really strengthen ksqlDB's offering as a streaming database.


-----


This article has a pretty good explanation of mapping SQL data to KV data  https://medium.com/@wishmithasmendis/from-rdbms-to-key-value-store-data-modeling-techniques-a2874906bc46


CockroachDB uses a similar approach to map SQL structures to KV stores (it also uses RocksDB as it’s underlying storage)  https://www.cockroachlabs.com/blog/sql-in-cockroachdb-mapping-table-data-to-key-value-storage/ and https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/encoding.md


--
You received this message because you are subscribed to the Google Groups "ksql-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-dev+unsubscribe@googlegroups.com.

Tim Fox

unread,
Nov 25, 2019, 11:51:34 AM11/25/19
to ksql-dev


I don't think we can avoid scatter gather in all cases, but in some we can by partitioning the data in the right way. I mentioned this on the other recent thread on this group:  

>> E.g. imagine there is a shopping basket BASKET_ITEMS table and the user wants to execute a pull query "SELECT * FROM BASKET_ITEMS WHERE USER_ID=?" to retrieve their current basket.The primary key of that table would be a composite (USER_ID, ITEM_ID). It would make sense to partition that table such that all items for a particular user are on the same partition (i.e. partition by USER_ID). That way we only need to execute the query on one node to get the correct results


So basically, given knowledge of the partition functions and the where clause, the query engine can figure out which set of nodes the query needs to be sent to. It won't be all nodes for all queries.

Nick Dearden

unread,
Dec 3, 2019, 5:56:42 PM12/3/19
to ksql-dev
For comparison, and possible inspiration, FoundationDB also has an interesting approach to layering "multi-model" semantics, including relational, on top of KV stores in a (highly) distributed way. Worth reading up on.

Tim Fox

unread,
Dec 4, 2019, 5:47:14 AM12/4/19
to ksql-dev
Thanks for the pointer! Reading up on the FoundationDB docs, it looks like it uses the same/similar approach as outlined above .
Reply all
Reply to author
Forward
0 new messages