Use Case : At any point of time last N hours of aggregated data needed

21 views
Skip to first unread message

Anup Tiwari

unread,
Mar 24, 2021, 1:09:03 AM3/24/21
to ksqldb-users
Hi Team,

I have a use case where i want at any given point of time for a userid, i want all his clicks in the last 48 Hours. Here 48 can be N and Hours can be any time unit like Minutes / Day etc.

Eg :- Lets say at current_time 2021-03-24 10:00:01 AM for a user id 201, the number of clicks in the last 48 hours(2021-03-22 10:00:02 AM) is 50. But if i hit pull query again for this user at current_time 2021-03-24 13:00:01 PM then count should be from 2021-03-22 13:00:01 PM to current_time which might be different from 50.

Can we do this in ksqlDB ? Since this seems a common use case to me.

From current windowing capability, it seems difficult.

Regards,
Anup Tiwari

Sergio Pena Anaya

unread,
Mar 24, 2021, 9:13:18 AM3/24/21
to Anup Tiwari, ksqldb-users
It is a common use case when using aggregations. I would think that a CTAS query that aggregates data from the last 48 hours, and then query the new table with a pull query would work. What statements and queries have you tried so far? 

--
You received this message because you are subscribed to the Google Groups "ksqldb-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-users+...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/ksql-users/CAH8KkAqBmZ4o14%3D2Q-HpzhTW2uQWNHa5BCJaGn9BeXaNOreDCA%40mail.gmail.com.

Anup Tiwari

unread,
Mar 24, 2021, 1:11:42 PM3/24/21
to Sergio Pena Anaya, ksqldb-users
Hi Sergio,

I tried the pull query on the table below but the result is not what I am expecting. For a user, this counter is only incrementing.


CREATE TABLE USER_CLICKS_PAST_24_HRS WITH (KAFKA_TOPIC='USER_CLICKS_PAST_24_HRS', PARTITIONS=100, REPLICAS=3) AS
SELECT   UID ,  
COUNT(EVENT) CLICK_COUNT

FROM CLICK_STREAM
WHERE (EVENT = 'hoverclick') AND (TIMESTAMP >= (UNIX_TIMESTAMP() - (((24 * 60) * 60) * 1000)))
GROUP BY UID
EMIT CHANGES;

Regards,
Anup Tiwari

Sergio Pena Anaya

unread,
Apr 2, 2021, 11:16:32 AM4/2/21
to Anup Tiwari, ksqldb-users
I see. That CTAS just counts events from the stream and updates the table with the aggregated result. To fix your problem you need windowed aggregations.

Anup Tiwari

unread,
Apr 2, 2021, 11:55:22 AM4/2/21
to Sergio Pena Anaya, ksqldb-users
Hey Sergio,

For such usecase we need sliding window and i think it is not possible currently in ksqldb. 

Anup Tiwari

unread,
May 15, 2021, 12:10:55 AM5/15/21
to Sergio Pena Anaya, ksqldb-users
Hi All,

I can see that a ticket is opened in ksqlDB for sliding window implementation.

Since the sliding window is supported in kafka, when can we expect this feature in ksqlDB ?
Also if there are any other ways(like UDAF etc..) to implement this on our own then please let me know because this is a very common requirement and as of now ksqlDB is not able to do it due to which we are looking to some other query engine like druid.

Regards,
Anup Tiwari

Anup Tiwari

unread,
Sep 13, 2023, 12:11:08 PM9/13/23
to Sergio Pena Anaya, ksqldb-users
Just to update the community that I have already developed a close workaround of this use case with the help of UDAF and UDF since the sliding window is still not available.


Solution :-

1. UDAF : "LAST_N_HOURS" --> This UDAF simply maintains hourwise count/sum of instances inside a map where key is yyyymmddHH and value is respective count/sum. As and when an event occurs for a user we fetch a timestamp of the event and create this MAP along with maintaining only LAST N HOURS keys inside this MAP.

2. UDF : "MAPSUM" ---> This will be used in select statements of pull queries and will aggregate only those keys((yyyymmddHH) which satisfies current time - N hour.

Just note that this is a workaround and there are few cases where it might give an extra hour of data but that is fine in our case.

Regards,
Anup Tiwari

Reply all
Reply to author
Forward
0 new messages