Hello folks,
As we start to look at implementing more complex pull queries than simple point lookups, I had some thoughts how we might do this. (I alluded to some of this on the ksql-dev list yesterday but wanted to post here as more sticky!)
Right now we use RocksDB as our state store. RocksDB is a KV store - and the way we use it currently is to store the message in the value. This works well for key lookups but for doing more complex WHERE clauses filtering on non key columns it becomes problematic - we would have to scan the entire table and deserialise each row to do the filtering operation - this would be very inefficient.
One solution to this would be to create new RocksDB databases to provide secondary indexes over the same message data, but this approach results in a proliferation of databases, and also creates consistency issues between updates to data and indexes.
A different approach would be to provide a mapping between SQL data and the KV store, such that any column value in a SQL table can be mapped to a unique key in the KV store. This approach is not new, and I won’t go into details on it. Please see links at bottom for detailed explanations.
To summarise the approach:
Any value in a SQL table can be mapped to a key such that:
key = table_name$primary_key_value$column_name
the value for that key would be the actual value of the data.
To lookup up a row with a particular primary key you ask RocksDB to retrieve all rows with key prefix “tablename$primary_key_value” (RocksDB supports prefix lookups). This will return one row for each column in the table.
Secondary indexes can also be created in a similar way by creating further keys, e.g.
Key = index_name$indexed_column_value$primary_key_value
TLDR, using this approach would allow us to store all of our SQL tables and indexes in a single RocksDB database. It would allow us to efficiently implement complex WHERE clauses with minimal serialisation overhead. It will also enable us to continue using RocksDB as our underlying storage engine - writing an efficient storage layer is no mean feat!
In order to use state stores from Kafka Streams such that our data is written in this new way, we would have to use the KS processor API instead of the KS DSL, and provide our own state store that knows how to write in this way, and we'd have to provide our own aggregator implementation. We'd also have to write the query engine to map the query to the to the KV lookups and do any joins, so it's a large piece of work, but considerably more tractable than writing a DB from the ground up. But, if we pull this off it will really strengthen ksqlDB's offering as a streaming database.
-----
This article has a pretty good explanation of mapping SQL data to KV data https://medium.com/@wishmithasmendis/from-rdbms-to-key-value-store-data-modeling-techniques-a2874906bc46
CockroachDB uses a similar approach to map SQL structures to KV stores (it also uses RocksDB as it’s underlying storage) https://www.cockroachlabs.com/blog/sql-in-cockroachdb-mapping-table-data-to-key-value-storage/ and https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/encoding.md
--
You received this message because you are subscribed to the Google Groups "ksql-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-dev+u...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/ksql-dev/aca36ba1-eb9e-4287-a165-5ab5e31b69e4%40googlegroups.com.
Few considerations.- This makes a simple key fetch also a range scan, which voids the use of bloom filters for lookup IIUC. This may be undesirable, need to understand scan performance more closely
- Our metastore that hands out the tableID and indexID needs to be consistent (I think with the recent work into command topic produce, we are there?) across ksql servers. (for the metrics use-case, this was a problem)- Note that the secondary index thus produced is a "local" secondary index. To get all value for say "age=20" we need issue the same scan lookup against all servers and scatter-gather the results. Do we also need to think about a "global" secondary index, which may be only eventually consistent?
- We can use something like https://github.com/facebook/rocksdb/wiki/Transactions to make the updates to primary and secondary key spaces transactional in a given state store.
But, if you consider ksqlDB as a distributed database, the servers may always be inconsistent with each other anyway.. We don't have a synchronous write layer that cockroachDB may have. So. what are the global consistency guarantees for a ksql table?
- We need to check if the streams replication/recovery work without much changes, even using custom state store/data model.. Otherwise, it's akin to writing our own database. I would rather start fresh :)Ultimately, this is a large effort and we need to prioritize this against other items on the roadmap.
/thanks/vinoth
On Sat, Nov 23, 2019 at 1:20 AM Tim Fox <tf...@confluent.io> wrote:
Sorry for the weird whitespace! I'm pretty sure I didn't type it that way :(--
On Saturday, November 23, 2019 at 9:00:36 AM UTC, Tim Fox wrote:Hello folks,
As we start to look at implementing more complex pull queries than simple point lookups, I had some thoughts how we might do this. (I alluded to some of this on the ksql-dev list yesterday but wanted to post here as more sticky!)
Right now we use RocksDB as our state store. RocksDB is a KV store - and the way we use it currently is to store the message in the value. This works well for key lookups but for doing more complex WHERE clauses filtering on non key columns it becomes problematic - we would have to scan the entire table and deserialise each row to do the filtering operation - this would be very inefficient.
One solution to this would be to create new RocksDB databases to provide secondary indexes over the same message data, but this approach results in a proliferation of databases, and also creates consistency issues between updates to data and indexes.
A different approach would be to provide a mapping between SQL data and the KV store, such that any column value in a SQL table can be mapped to a unique key in the KV store. This approach is not new, and I won’t go into details on it. Please see links at bottom for detailed explanations.
To summarise the approach:
Any value in a SQL table can be mapped to a key such that:
key = table_name$primary_key_value$column_name
the value for that key would be the actual value of the data.
To lookup up a row with a particular primary key you ask RocksDB to retrieve all rows with key prefix “tablename$primary_key_value” (RocksDB supports prefix lookups). This will return one row for each column in the table.
Secondary indexes can also be created in a similar way by creating further keys, e.g.
Key = index_name$indexed_column_value$primary_key_value
TLDR, using this approach would allow us to store all of our SQL tables and indexes in a single RocksDB database. It would allow us to efficiently implement complex WHERE clauses with minimal serialisation overhead. It will also enable us to continue using RocksDB as our underlying storage engine - writing an efficient storage layer is no mean feat!
In order to use state stores from Kafka Streams such that our data is written in this new way, we would have to use the KS processor API instead of the KS DSL, and provide our own state store that knows how to write in this way, and we'd have to provide our own aggregator implementation. We'd also have to write the query engine to map the query to the to the KV lookups and do any joins, so it's a large piece of work, but considerably more tractable than writing a DB from the ground up. But, if we pull this off it will really strengthen ksqlDB's offering as a streaming database.
-----
This article has a pretty good explanation of mapping SQL data to KV data https://medium.com/@wishmithasmendis/from-rdbms-to-key-value-store-data-modeling-techniques-a2874906bc46
CockroachDB uses a similar approach to map SQL structures to KV stores (it also uses RocksDB as it’s underlying storage) https://www.cockroachlabs.com/blog/sql-in-cockroachdb-mapping-table-data-to-key-value-storage/ and https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/encoding.md
You received this message because you are subscribed to the Google Groups "ksql-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-dev+unsubscribe@googlegroups.com.
I don't think we can avoid scatter gather in all cases, but in some we can by partitioning the data in the right way. I mentioned this on the other recent thread on this group:>> E.g. imagine there is a shopping basket BASKET_ITEMS table and the user wants to execute a pull query "SELECT * FROM BASKET_ITEMS WHERE USER_ID=?" to retrieve their current basket.The primary key of that table would be a composite (USER_ID, ITEM_ID). It would make sense to partition that table such that all items for a particular user are on the same partition (i.e. partition by USER_ID). That way we only need to execute the query on one node to get the correct results