Ksql Key Format

0 views
Skip to first unread message

Егор Ульянов

unread,
Aug 4, 2024, 4:14:51 PM8/4/24
to zinsreatotip
Oneof my topics has a string-json as a key - "city":"X","id":22.In my ksql statement I want to extract it to 2 different fields not one so I can later filter and join. In the docs it seems to allow me to stick only the whole string into the key instead of allowing to format it as JSON (just like FORMAT_VALUE) see below... Any suggestions for an elegant solution to this?

You should be able to do this using the EXTRACTJSONFIELD UDF on the ROWKEY column. The ROWKEY column is a "column" in your schema that contains the key for a given row. The EXTRACTJSONFIELD UDF lets you extract fields from columns that contain json strings. So, for your example you could run:


The term serialization format refers to the manner in which a record's raw bytesare translated to and from information structures that ksqlDB can understandat runtime. ksqlDB offers several mechanisms for controlling serializationand deserialization.


It's main use is as the KEY_FORMAT of key-less streams, especially where a default key format has been set, via ksql.persistence.default.format.key that supports Schema inference. If thekey format was not overridden, the server would attempt to load the key schema from the Schema Registry.If the schema existed, the key columns would be inferred from the schema, which may not be the intent.If the schema did not exist, the statement would be rejected. In such situations, the key format canbe set to NONE:


If a CREATE TABLE AS or CREATE STREAM AS statement has a source with a key format of NONE, butthe newly created table or stream has key columns, then you may either explicitly define the key format to use in the WITH clause, or the default key format, as set in ksql.persistence.default.format.keywill be used.


The DELIMITED format supports comma-separated values. You can use otherdelimiter characters by specifying the KEY_DELIMITER and/or VALUE_DELIMITER when you useFORMAT='DELIMITED' in a WITH clause. Only a single character is validas a delimiter. The default is the comma character. For space- andtab-delimited values, use the special values SPACE or TAB, not an actualspace or tab character.


There are two JSON formats, JSON and JSON_SR. Both support serializing anddeserializing JSON data. The latter offers integration with the Schema Registry,registering and retrieving JSON schemas while the former does not. These twoformats are not byte compatible (you cannot read data produced by one by theother).


The JSON formats supports all SQL data types.By itself, JSON doesn't support a map type, so ksqlDB serializes MAP types asJSON objects. For this reason, the JSON format supports only MAP objectsthat have STRING keys.


The following keywords aren't supported in the VALUE_SCHEMA_ID field for INSERTINTO and CREATE TABLE AS SELECT statements, and if present in your JSON schemawill cause serialization errors. They are supported in CREATE statements.


By default, ksqlDB-registered schemas have the same name (KsqlDataSourceSchema) and the same namespace (io.confluent.ksql.avro_schemas). You can override this behavior by providing a VALUE_AVRO_SCHEMA_FULL_NAME property in the WITH clause, where you set the VALUE_FORMAT to 'AVRO'. As the name suggests, this property overrides the default name/namespace with the provided one. For example, com.mycompany.MySchema registers a schema with the MySchema name and the com.mycompany namespace.


The table below details the SQL types the format supports, includingdetails of the associated Kafka Java Serializer, Deserializer andConnect Converter classes you would need to use to write the key toKafka, read the key from Kafka, or use to configure Apache Connect towork with the KAFKA format, respectively.


If you integrate ksqlDB withConfluent Schema Registry,and your ksqlDB application uses a compatible value format (Avro, JSON_SR, orProtobuf), you can just supply the key column, and ksqlDB loads the valuecolumns from Schema Registry:


Protobuf handles null values differently than AVRO and JSON. Protobuf doesn'thave the concept of a null value, so the conversion between PROTOBUF and Java(Kafka Connect) objects is undefined. Usually, Protobuf resolves a"missing field" to the default value of its type.


To enable alternative representations for null values in protobuf, protobuf-specific propertiescan be passed to CREATE statements. For example, the following CREATE statement willcreate a protobuf schema that wraps all primitive types into the corresponding standard wrappers (e.g. google.protobuf.StringValue for string).


This way, null can be distinguished from default values. Similarly, when VALUE_PROTOBUF_NULLABLE_REPRESENTATION is set to OPTIONAL, all fields in protobuf will bedeclared optional, also allowing null primitive fields to be distinguished from default values.


ksqlDB assumes that any single key is unwrapped, which means that it's not contained in an outerrecord or object. Conversely, ksqlDB assumes that any key with multiple columns(for example, CREATE STREAM x (K1 INT KEY, K2 INT KEY, C1 INT)) is wrapped, which means that it is a recordwith each column as a field within the key.


To declare a single-column key that's wrapped, specify a STRUCT typewith a single column. For example, K STRUCT KEY. See the next two sections on single values for more information about wrapped and unwrapped data.


By default, ksqlDB expects any value with a single-field schema to havebeen serialized as a named field within a record. However, this is notalways the case. ksqlDB also supports reading data that has beenserialized as an anonymous value.


If your data contains only a single field, and that field is not wrappedwithin a JSON object, or an Avro record is using the AVRO format, thenyou can use the WRAP_SINGLE_VALUE property in the WITH clause ofyour CREATE TABLE orCREATE STREAM statements. Setting theproperty to false tells ksqlDB that the value isn't wrapped, so theexample above would be a JSON number:


If a statement doesn't set the value wrapping explicitly, ksqlDB uses thesystem default, which is defined by ksql.persistence.wrap.single.values.You can change the system default, if the format supports it. For more information, seeksql.persistence.wrap.single.values.


A null value in a table's topic is treated as a tombstone, whichindicates that a row has been removed. If a table's source topic has anunwrapped single-field key schema and the value is null, it's treatedas a tombstone, resulting in any previous value for the key beingremoved from the table.


A null key or value in a stream's topic is ignored when the stream ispart of a join. A null value in a table's topic is treated as atombstone, and a null key is ignored when the table is part of a join.


By default, if the value has only a single field, ksqlDB serializes thesingle field as a named field within a record. However, this doesn'talways match the requirements of downstream consumers, so ksqlDB allowsthe value to be serialized as an anonymous value.


If a statement doesn't set the value wrapping explicitly, ksqlDB uses thesystem default, defined by ksql.persistence.wrap.single.values, if the format supports it. You can change the system default. For more information, seeksql.persistence.wrap.single.values.


Schema Registry enables both schema inference and defining schemas explicitly.You can use the identifiers for specific schemas in your CREATE STREAM andCREATE TABLE statements with KEY_SCHEMA_ID and VALUE_SCHEMA_ID properties.For more information, see Schema Inference With ID.


Format query results as JSON, or export data from SQL Server as JSON, by adding the FOR JSON clause to a SELECT statement. Use the FOR JSON clause to simplify client applications by delegating the formatting of JSON output from the app to SQL Server.


To add a single, top-level element to the JSON output, specify the ROOT option. If you don't specify this option, the JSON output doesn't have a root element. For more info, see Add a Root Node to JSON Output with the ROOT Option (SQL Server).


To include null values in the JSON output, specify the INCLUDE_NULL_VALUES option. If you don't specify this option, the output doesn't include JSON properties for NULL values in the query results. For more info, see Include Null Values in JSON - INCLUDE_NULL_VALUES Option.


To remove the square brackets that surround the JSON output of the FOR JSON clause by default, specify the WITHOUT_ARRAY_WRAPPER option. Use this option to generate a single JSON object as output from a single-row result. If you don't specify this option, the JSON output is formatted as an array - that is, the output is enclosed within square brackets. For more info, see Remove Square Brackets from JSON - WITHOUT_ARRAY_WRAPPER Option.


By default, SQL Server Management Studio (SSMS) concatenates the results into a single row when the output setting is Results to Grid. The SSMS status bar displays the actual row count.


Other client applications might require code to recombine lengthy results into a single, valid JSON string by concatenating the contents of multiple rows. For an example of this code in a C# application, see Use FOR JSON output in a C# client app.


If either the key format or value format is unspecified, then the default formats will be used. Default key and value formats are controlled by the ksql.persistence.default.format.key and ksql.persistence.default.format.value configs, respectively. In a ksqlDB server configuration file, this could be the following:


By default, ksql.persistence.default.format.key is set to KAFKA, so statements that do not specify an explicit key format continue to function as in older ksqlDB versions. The ksql.persistence.default.format.value config has no default. Unless it is set, CREATE STREAM and CREATE TABLE statements that do not specify an explicit value format will be rejected.

3a8082e126
Reply all
Reply to author
Forward
0 new messages