Ksql Performance

0 views
Skip to first unread message

Derrik Navarro

unread,
Aug 4, 2024, 7:17:20 PM8/4/24
to calneusantai
Oneof the principal benefits of using ksqlDB for stream processing is that itenables you to define your workload declaratively. All ksqlDB stream processinglogic is defined using SQL statements, which are internally transformed intoexecution plans that perform the necessary computations as efficiently aspossible.

Abstracting away internal execution plans underneath a SQL layer enables rapiddevelopment of stream processing workloads, but when you want optimalperformance, you need to know something about the internal system behaviorof ksqlDB.


This document provides an overview of key factors affecting ksqlDB performance,so you can develop high-performance stream processing workloads using ksqlDB.A basic understanding of ksqlDB's key abstractions is assumed, so if you're newto ksqlDB, read through the core concepts documentation beforeproceeding.


These guidelines are recommendations for getting the best performance from yourksqlDB workload. For example, if your workload is unable to keep up with therate at which your input data is being produced, these guidelines may help toincrease your workload's throughput to keep it from falling behind.


Typically, a ksqlDB workload is composed of one or more persistent queries.These queries consume input continuously from Apache Kafka topics and producetheir output back into Kafka topics. Although this document describeshow to define these persistent queries such that they are as efficient aspossible, the number of persistent queries running on a ksqlDB deployment hassystem-wide performance implications, regardless of how simple or complex theymay be.


Combine as much work as possible into as few persistent queries as possible.Each individual persistent query within a ksqlDB workload incurs a significantamount of system resource overhead. As a result, you should structure yourworkload such that all required work is consolidated into as few persistentqueries as possible. A workload consolidated into a single persistent queryrequires significantly less system-resource overhead as the same workloadis spread across two persistent queries.


Minimizing the number of persistent queries that your workload uses is probablythe single most impactful design pattern that you can leverage, so it's arecurring theme throughout this performance guide. You'll learn more about thespecific benefits of running fewer persistent queries in the following sections.


ksqlDB workloadsconsume their input from Kafka topics and produce their output intoKafka topics, but some local state is also maintained on each ksqlDB node.This state effectively serves as a local cache of topic data that improves workloadperformance. Local ksqlDB state is managed by using a pluggable key-value storageinterface. RocksDB is the default storage engine for local state.


In addition to being stored as local key-value stores, each ksqlDB state storeis also backed by a changelog topic in Kafka. Storing a complete log ofall changes made to each local state store enables ksqlDB to recover in theevent of a node failure.


State store sizes are proportional to the size of each key-value record thatthey contain. Smaller state stores reduce the amount of disk read-and-writethroughput performed by ksqlDB, resulting in better overall workload throughput.Also, smaller state stores minimize local and Kafka disk storageconsumption. State store row sizes may be minimized by eliminating unusedfields in your records.


Minimize the number of state stores that your ksqlDB deployment uses. ksqlDBbatches writes made to local state stores, so fewer state stores enables largerbatches of writes to a given state store, which improves disk write throughput.Having fewer state stores minimizes both local and Kafka disk storagerequirements.


Input and output records consumed and produced by ksqlDB workloads areserialized as Kafka messages, so each consumed record must bedeserialized before being processed by ksqlDB. Similarly, each output recordproduced by ksqlDB must be serialized before being written to Kafka.As a result, record serialization is generally the most expensive aspect ofany ksqlDB workload.


Avoid extraneous serialization and deserialization operations. Combine as muchwork as possible into as few persistent queries as your use case allows. Forexample, instead of performing a transformation with one persistent query andfiltering its output with another persistent query, consider combining thetransformation and filter into a single persistent query. In this case,serialization and deserialization overhead would be reduced approximately byhalf.


Avoid unnecessary record complexity and size. Smaller, simpler records are moreefficient to serialize and deserialize. If your workload doesn't requirecertain fields in your input messages, consider removing these fields as earlyas possible to improve serialization performance. In some cases, it may bepreferable to use ksqlDB itself to perform this message simplification byselecting a subset of the messages' fields to prepare them for downstreamprocessing.


A transformation is any reference to a column that is not a bare columnreference. For example, a function call over a column, and an arithmeticexpression involving one or more columns are both transformations. A filteris a Boolean expression invoked via a WHERE clause that excludes any recordfor which that expression evaluates to false. Transformations and filtersare thus evaluated for each input record, and typically incur a negligibleamount of performance overhead. This does not necessarily apply touser-defined functioninvocations.


Consolidate your transformations and filters into as few persistent queries aspossible. Although transformations and filters generally aren't performancebottlenecks, each individual persistent query does incur a significant amountof system-resource overhead. For this reason, you should combine as many ofyour transformations and filters as possible into the fewest number ofpersistent queries that your workload allows. Additionally, for anytransformations that reduce record size, we recommend that you perform thesetransformations as early as possible in your processing pipeline.


For aggregations, ksqlDB must ensure that each node in a deploymentis always guaranteed to process the same subset of aggregation groupings, whichenables each ksqlDB node to accumulate aggregate results independently of othernodes in the deployment.


Whenever possible, key your join input streams and tables on the columns thatwill be used in downstream joins. If input streams and tables are co-partitionedalready, no internal repartitioning is required to join them.


Whenever possible, key your aggregation inputs on the aggregation groupingcriteria. If you're aggregating a stream using a single grouping column,consider keying the input stream on that column, if your workload permits.This avoids an internal repartitioning.


If repartitioning is unavoidable for some aspect of your workload, considersharing any repartitioned data in order to avoid extraneous repartitions. Forexample, if you know that you'll be aggregating a stream over column col1and also joining against col1 somewhere else, keying the stream explicitlyon col1 will avoid two repartitions. This is another instance in whichconsolidating logic into as few persistent queries as possible can benefit yourworkload's efficiency, by reducing repartitioning overhead.


To provide scalability and fault tolerance,ksqlDB parallelizes workloadsacross nodes in a deployment. This parallelism is achieved by leveragingtopic partitions. For a given persistent query, each node in a ksqlDBdeployment processes a subset of the persistent query's input partitions.This is an important consideration to take into account when designing aworkload that leverages all of the available resources within a ksqlDBdeployment.


Spread your workload across your ksqlDB deployment by using an appropriatenumber of partitions. If a persistent query's input stream has two partitions,and your ksqlDB deployment has four nodes, two of these nodes aren't performingany work for the persistent query, and you get an unbalanced workload. For thisreason, input streams and tables usually should have more partitions than thereare nodes in a ksqlDB deployment.


Stateful persistent queries create one state store per partition, so too manypartitions can result in an excessive number of state stores, each with a verythin shard. You need adequate partitions to parallelize your workload completelyacross all nodes in your cluster, but no more than that. Usually, a good balanceis achieved with twice as many partitions as nodes in your cluster.


ksqlDB provides one of the easiest ways to compute aggregationsover your topic data stored in Kafka. These aggregations are defined as persistentqueries whose results are maintained within aksqlDB table,optionally with atime windowas a dimension. Since aggregations are inherently stateful, theyrely heavily on local materialized state stores to accumulateongoing aggregation results efficiently. These state stores maintain a localcache of current aggregate results for the subset of persistent query partitionsthat are assigned to a given node in a deployment.


Frequently, aggregations aren't done in isolation, for example, a SUM call is often accompanied by a COUNT call. Whenever the aggregation grouping criteria, and, potentially, filtering criteria, are equivalent for multiple aggregations,you should combine these aggregate calls into the same persistent query.Aggregation consolidation can potentially avoid duplicate repartitioning incases for which repartitioning is necessary, as well as minimize the number ofstate stores required for a given set of aggregations.


Aggregations that use a WINDOWclause add dimensionality to the aggregation's grouping criteria. Think of theaggregation's WINDOW as an implicit grouping column. Each aggregation grouprequires local materialization, via state stores, of up to one row per groupingcriteria, per window. For example, a window width of one hour will result in 24times more windows than a window width of one day. As a result, the number oflocally materialized rows required for a windowed aggregation is inverselyproportional to the window's width. We recommend using aggregation windows withthe widest width that your use case will allow.

3a8082e126
Reply all
Reply to author
Forward
0 new messages