Problem when using KSQL with JSON value format.

1,169 views
Skip to first unread message

Kuldeep Poonia

unread,
Sep 2, 2017, 4:02:55 AM9/2/17
to Confluent Platform
Hi All,

I want to go through the concept of KSQL and trying to run some example with this. I created a stream with a topic with JSON value_format. The topic contain data in JSOn format like this:

{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"MANDT"},{"type":"string","optional":false,"field":"SHDEFG"},{"type":"string","optional":false,"field":"VBELN"},{"type":"string","optional":false,"field":"POSNR"},{"type":"string","optional":false,"field":"ZERDAT"},{"type":"string","optional":false,"field":"ERNAM"},{"type":"string","optional":false,"field":"EXTDSN"},{"type":"string","optional":false,"field":"EXTDSD"},{"type":"string","optional":false,"field":"REMDEL"},{"type":"string","optional":false,"field":"REMSHP"},{"type":"string","optional":false,"field":"DELBY"},{"type":"string","optional":false,"field":"DELON"},{"type":"string","optional":false,"field":"BRANCH"},{"type":"string","optional":false,"field":"CONTRACT_NO"},{"type":"string","optional":false,"field":"ZINVRMK"},{"type":"string","optional":false,"field":"ZOLDRSN"},{"type":"string","optional":false,"field":"ZUPDBY"},{"type":"string","optional":false,"field":"ZUPDON"},{"type":"string","optional":false,"field":"ZTIME"}],"optional":false,"name":"ZHIST_DEL_SHPDEL"},"payload":{"MANDT":"400","SHDEFG":"S","VBELN":"1110507050","POSNR":"000000","ZERDAT":"20170808","ERNAM":"CDOUSER","EXTDSN":" ","EXTDSD":"20170808","REMDEL":" ","REMSHP":"02","DELBY":"CDOUSER","DELON":"20170902","BRANCH":"2310","CONTRACT_NO":" ","ZINVRMK":" ","ZOLDRSN":" ","ZUPDBY":" ","ZUPDON":"00000000","ZTIME":"130234"}}.

Now I am creating a stream using this topic like:
create stream my_stream (VBELN varchar) with (kafka_topic='test-sqlite-jdbc-ZHIST_DEL_SHPDEL', value_format='JSON');

After this when I am executing the query: select * from my_stream; it show me output like this:
1504336766882 | {"schema":null,"payload":null} | null

I am unable to understand why payload coming null. Please tell me where am I wrong?



hoj...@confluent.io

unread,
Sep 3, 2017, 4:12:13 PM9/3/17
to Confluent Platform
Hi Kuldeep,

KSQL reads the first level of fields in your JSON and if you have nested JSON with more than one level you need to use 'EXTRACTJSONFIELD' udf to extract the desired fields from JSON value. 
You can define your stream as follows first:

create stream my_stream (schema varchar, payload MAP<varchar, varchar>) with (kafka_topic='test-sqlite-jdbc-ZHIST_DEL_SHPDEL', value_format='JSON');

and then create a new stream from that using the following statement

CREATE STREAM my_stream2 AS SELECT EXTRACTJSONFIELD(schema, '$.name) AS schema_name FROM ny_stream;

Kuldeep Poonia

unread,
Sep 3, 2017, 11:31:49 PM9/3/17
to Confluent Platform
Hi Team,

Thanks for your help. I will implement the same. I am also facing another issue in join. Scenario is that:-

1. I create a table like create table my_table (Id varchar, Name varchar) with (kafka_topic='Test2', value_format='JSON');
2. I create another stream like create stream my_stream (Id varchar, Address varchar) with (kafka_topic='Test3', value_format='JSON');

Now I am joining the same like as
select a.Id, a.Name, b.Address from my_table as a inner join my_stream as b on b.Id = a.Id;

but it's showing me null and terminate the query.

hoj...@confluent.io

unread,
Sep 4, 2017, 6:43:14 PM9/4/17
to Confluent Platform
Hi Kuldeep,

Currently KSQL only supports LEFT JOIN for Stream/Table join so the INNER JOIN won't work. However, you can get the same result by performing LEFT JOIN and adding a WHERE clause that uses IS NOT NULL to enforce left hand side if your join is not null.

Also you can JOIN table on it's key column so your Test2 topic should have the same value as 'Id' field in the key too.

Let me know if this works.

Cheers,
--Hojjat

Kuldeep Poonia

unread,
Sep 5, 2017, 5:55:22 AM9/5/17
to Confluent Platform
Hi Hojjat,

As per your suggestion, I used left join instead of inner join, but still result showing me as null .


Please suggest when am I wrong?

hoj...@confluent.io

unread,
Sep 5, 2017, 10:17:40 AM9/5/17
to Confluent Platform
Hi Kuldeep,

In Stream/Table join Stream should be in left hand side. Please change your query so the stream is in the left hand side.

Let me know if you find any other issues.

Cheers,
--Hojjat

Kuldeep Poonia

unread,
Sep 6, 2017, 12:14:36 AM9/6/17
to Confluent Platform
Hi Hojjat,

Still  showing me same result.

hoj...@confluent.io

unread,
Sep 6, 2017, 2:48:20 PM9/6/17
to Confluent Platform
Hi Kuldeep,

Seems that you have a syntax error in your join statement. YOu don't need AS in defining alias for streams an tables in the query. Try this please:

SELECT A.ID, A.ADDRESS, B.NAME FROM my_stream A LEFT JOIN my_table B ON A.ID = B.ID;

Kuldeep Poonia

unread,
Sep 6, 2017, 11:32:45 PM9/6/17
to Confluent Platform
Thanks Hojjat,

It's working now and I am very very sorry for wasting your time for such a mistake which was on my side.

Kuldeep Poonia

unread,
Sep 9, 2017, 3:26:51 AM9/9/17
to Confluent Platform
Hi Hojjat,

Can I alter a previous table so that I can add new column? or first delete KSQL table that add create table again with desired columns.
Reply all
Reply to author
Forward
0 new messages