Stickiness calculation

71 views
Skip to first unread message

AR

unread,
Apr 26, 2023, 2:25:50 AM4/26/23
to Druid User
Hi,
We have a monthly table tracking user hits with about 500K records per month. We want to determine if users who accessed a particular entity last month accessed the same entity the next month (stickiness). Since Druid doesn't support an "exists" query, we are currently using a self join for this.  
Query for last 6 months would be is of the form
SELECT
    curr.datetime,
curr.entity,
(count (distinct prev.user)*1.0 / count (distinct curr.user)) as STICKINESS
FROM
    (
        SELECT DISTINCT
              __time  as datetime
            , user
            ,entity
        FROM
            hits_monthly
        WHERE
            __time between time_parse('20221101', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')

    ) curr
LEFT JOIN
    (
        SELECT DISTINCT
              __time  as datetime
            , user
            ,entity
        FROM
            hits_monthly
        WHERE
            __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230301', 'yyyyMMdd')

    ) prev
ON
curr.user = prev.user and curr.entity = prev.entity and prev.pterm = TIMESTAMPADD(month, -1, curr.pterm)
GROUP BY 1,2

Above query takes ~3.5 min. We tried increasing the buffer size from 500M to 1G but that did not make any difference to the execution time. So it looks like the join (and the inherent data shuffle) is what may be taking a lot of time.

What would be a better approach to extract this info? Would a multi-stage query work better? I did try to write one but wasn't successful as the console kept throwing parse error for the keywords PARTITIONED & CLUSTERED.

Config:
Druid version - 24.0.2
4 Historical nodes - 2 with 16 CPU and 2 with 8 CPU.
MergeBuffers - 15
Buffer Size - 1G

druid.sql.planner.useApproximateCountDistinct - false    --> We need exact count
druid.sql.planner.useGroupingSetForExactDistinct - true  --> As we are getting count of different columns in same query

John Kowtko

unread,
Apr 26, 2023, 9:26:27 AM4/26/23
to Druid User
I can think of a few things you could try:

1. use only one subquery as a CTE "with() clause", that covers the past 7 months ... then your LEFT join is a self-join, i.e. one less subquery and resulting temp table to deal with

2. In your subqueries (including option #1 above) precompute TIMESTAMPADD(month, -1, curr.pterm) as a new field 'nterm', then your LEFT JOIN predicate can be a direct field-to-field value comparison ... not sure if this will help the SQL execution here but it is something easy to try.

3. Add a subquery (or CTE) that pulls the distinct list of all users that show up at all within the past 7 months, and use that in an IN() clause in whatever queries you use above ... 

4. If you only care around the distinct user list for each month then just compute that in your subqueries, i.e. "select month, distinct(user_id) from table group by month"

Let us know if any of these help.

Thanks.  John

AR

unread,
Apr 26, 2023, 10:33:25 AM4/26/23
to Druid User
Hi John,
Thank you for your suggestions. I was about to post an update.

Regarding the points 1. & 2., I tried the same earlier today. I defined both sides as a CTE and did a join on them.
Modified query:

with data_curr as (
SELECT DISTINCT
      CONCAT(TIME_FORMAT(__time, 'yyyyMM'), '~', entity) as entity
    , user

FROM
    hits_monthly
WHERE
    __time between time_parse('20221101', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
),
data_prev as (
SELECT DISTINCT
      CONCAT(TIME_FORMAT(TIMESTAMPADD(month, 1, __time), 'yyyyMM'), '~', entity) as entity
    , user

FROM
    hits_monthly
WHERE
    __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230301', 'yyyyMMdd')
)
SELECT
    data_curr.entity
    , (count (distinct data_prev.user)*1.0 / count (distinct data_curr.user)) as STICKINESS
from
  data_curr left join data_prev
on data_curr.user = data_prev.user
  and data_curr.entity = data_prev.entity
GROUP BY  1

With the above query, the execution time came down to ~1min 15 sec - a significant improvement but we need more. :)
The other issue with the above query is that I am unable to run it for a longer duration (eg: 1Y) as it errors out with a "ResourceLimitExceeded" exception.
druid.server.http.maxSubqueryRows=1500000  --> Sub query is generating >1.5MM rows.

I next tried to run a bunch of UNION ALL queries with each query running for 2 months interval. While this seems to work, it doesn't scale well. The execution time increases linearly with each UNION query taking around 9-10 sec (~60 sec for 6 months). It looks like Druid is not executing the pieces of the UNION ALL query in parallel. Is it possible to tell Druid to run them in parallel?

Coming back to your suggestions:
1. What you are suggesting is to have just one CTE and do a self join on it. I will try this as well and revert.
3. There are about 190K distinct users for the last year. Can Druid handle such large numbers in an IN query?
4. This doesn't help as we need to find stickiness at entity level.

Thanks.

John Kowtko

unread,
Apr 26, 2023, 10:56:42 AM4/26/23
to Druid User
The CTEs and the subquery feeding the IN() clause should all be broadcast "temp tables" ... so in the case of #1 you should be eliminating one of the two subqueries ... and in #3 you are creating a 190k row subquery that should also be easily broadcast.

Okay I didn't notice the 'entity' grouping before ... so what is the distinct list size of the (entity, user) combination?  a given user generally associates with only one entity then there should be little overlap you could do filtering on that distinct combination (can't be an IN clause anymore because I don't think Druid supports multi-column IN() clauses, but it can be a JOIN).

Rolling up a lot of these ideas, you could also try something like:

with tmp as (
 select distinct 
        TIME_FORMAT( __time, 'yyyyMM') curr_month, 
        TIME_FORMAT(TIMESTAMPADD(month, 1, __time), 'yyyyMM') as next_month, 
        CONCAT(TIME_FORMAT(__time, 'yyyyMM'), '~', entity) as entity, 
        user 
   from hits_monthly 
  where __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
)
SELECT curr.entity,
       count(prev.user)*1.0 / count(*) as STICKINESS
  from tmp curr 
  left join tmp prev
    on curr.user = prev.user
   and curr.entity = prev.entity
   and curr.curr_month = prev.next_month
GROUP BY  1

AR

unread,
Apr 26, 2023, 12:57:15 PM4/26/23
to Druid User
Hi John,
I tried something very similar to what you have suggested above.

with data as (

SELECT DISTINCT
      CONCAT(TIME_FORMAT(__time, 'yyyyMM'), '~', entity) as entity
      ,CONCAT(TIME_FORMAT(TIMESTAMPADD(month, 1, __time), 'yyyyMM'), '~', entity) as entity_prev

    , user
FROM
    hits_monthly
WHERE
    __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
)
SELECT
    d1.entity
    , (count (distinct d2.user)*1.0 / count (distinct d1.user)) as STICKINESS
from
  data d1 left join data d2
on d1.user = d2.user  
  and d1.entity = d2.entity_prev
GROUP BY  1

This query has the same performance as the one with the 2 CTEs that I tried earlier (~75 sec for 6 months). It's smaller and cleaner but it also gives an additional set of results for the first interval which has to be ignored (since both left & right are querying over the same period). 
This also has the same limitation with exceeding the "maxSubqueryRows" limit when the period is increased to 1 year.

So it looks like the UNION ALL query is the one that can run for the full year. Is there a way to tell Druid to run the individual queries in parallel and then UNION the results on the broker? Or is this something that can be done with the new multi-stage query?

distinct list size of the (entity, user) --> 90k per month

Another idea that I had was to aggregate the distinct users per entity into an Array and then use the MV_FILTER_NONE & MV_FILTER_ONLY functions on the curr & prev user arrays to get the 2 counts needed for the calculation. However the first step to aggregate distinct users per entity for 1 month itself did not complete in 2 min (cancelled after 2 min) so I dropped this idea.

So you are saying that broadcasting a 190k CTE or using it in a IN clause is possible. Let me see if I can work something using this.

Thank you.

John Kowtko

unread,
Apr 27, 2023, 3:25:23 PM4/27/23
to Druid User
Hi AR,

I don't know that using an IN() clause would be any better here, since you have already reduced this to the distinct list of user/entity combinations per month ... and IN() clauses often are converted to joins, which is what you are already doing.

How long does the CTE by itself take to run, and how many records does it have in it?

As for the final filtering, I assume you know you could just add a final filter "where d1.entity not like '%20221001'" to get rid of those records.

Using the MVs (or string_agg, or upcoming Array support) you may be on to something ... 
... however instead of aggregating the users ... that's very high cardinality ... instead try grouping on entity and user, and aggregate the months present into the MV? ... the MVs will be much shorter (up to only 7 entries in each), maybe that will process faster?  Then your outer select can do a final aggregate (no join) on the one table checking where the MV contains two consecutive months.  And if you reduce the months to a numeric sequence, then this where condition only has to check "where MV contains N and MV contains N-1" ... what do you think?

Thanks.  John

AR

unread,
Apr 27, 2023, 10:55:40 PM4/27/23
to Druid User
Hi John,
You are right. Using the IN() clause gave no benefit. In fact, it took the execution time back to the ~3.5 min that we started with.

There are around 330K distinct entity-user combinations for 1 year. Around 90k distinct combinations per month.

For now, we decided to go with the UNION ALL approach as this offers a middle path b/w the large execution time of a single query and the resource usage/complexity of submitting multiple queries and tying up the results on client side.

Flipping the aggregation to use entity-user combination as the key is a good idea. I didn't think of it as the complexity of then getting the count of users for each period seems high. Mainly because I couldn't find a function to do the reverse of ARRAY_AGG -  a Druid equivalent of Spark's "explode" function which would allow the ARRAY/MV fields to be included back as rows of the table. If table has one row with a ARRAY/MV column then "explode" of the ARRAY/MV column would duplicate the row for each value contained in the ARRAY/MV. Is there any such function?

But I will give it a shot early next week and post an update.

Thank you,
AR.

AR

unread,
Apr 28, 2023, 4:24:37 AM4/28/23
to Druid User
Hi John,
Figured out how to explode a multi-value into rows using a "group by". 
Array --> String --> Multi value  - multiple hops to do this. 

Will give your suggestion a try.

Thanks.

John Kowtko

unread,
Apr 28, 2023, 6:39:31 AM4/28/23
to druid...@googlegroups.com
Yes, you got it :).  Full *true* array support is on it's way (partially implemented already) and UNNEST() is actually already here (but limited use pending array support) ... but in the meantime it looks like you have figured out the only workaround I know of, which is to convert to MV and then do a group by to auto-magically unnest the elements.

But do you need to do this?  If you aggregate the monitos into an MV can't you just search the MV directly for curr-prev pairs?

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/b3919119-93a0-456a-bdc4-9525ea9827c0n%40googlegroups.com.


--
John Kowtko
Technical Success Manager

AR

unread,
May 2, 2023, 1:29:00 PM5/2/23
to Druid User
Hi John,
Following your suggestion, I tried doing something like below but this sample query fails with an exception. I know that Druid does not support co-related sub queries but what about counts like the one below?

Query:
with ut as
(select 'u1' u, string_to_mv(array_to_string(array['abc~202301', 'abc~202302', 'de~202302', 'pqr~202302'], ','), ',') t),
et as
(select 'abc' e, '202301' ti)
select
  e,
  ti,
  (select count(distinct u) from ut where MV_CONTAINS(t, concat(et.e, '~', et.ti))) cnt
from et

ut --> A table containing  users and distinct list of entity-time period combinations for each user.
et --> Distinct list of entity - time period combinations
Finally, 2 counts of users will be needed for curr & prev time period for each entity which can be used to calculate the stickiness.

Query Exception:
While invoking method 'public org.apache.calcite.sql2rel.RelFieldTrimmer$TrimResult org.apache.calcite.sql2rel.RelFieldTrimmer.trimFields(org.apache.calcite.rel.core.Sort,org.apache.calcite.util.ImmutableBitSet,java.util.Set)'

java.lang.RuntimeException

Am I doing this correctly?

Thanks,
AR.

AR

unread,
May 3, 2023, 6:42:21 AM5/3/23
to Druid User
I was able to get the counts using a cartesian join. Is this what you were suggesting as well?

with ut as
(select 'u1' u, string_to_mv(array_to_string(array['abc~202301', 'abc~202302', 'de~202302', 'pqr~202302'], ','), ',') t),
et as
(select 'dbc' e, '202301' ti)
select e, ti, MV_CONTAINS(t, concat(et.e, '~', et.ti)) as curr_exists from et join ut on  1=1

John Kowtko

unread,
May 4, 2023, 12:18:33 AM5/4/23
to Druid User
Hi AR,  I was thinking of something like this:

with tmp_hits as (
 select entity, 
        user,
        array_agg( distinct TIME_FORMAT(__time, 'YYYYMM')) curr_month_arr,
        array_agg( distinct TIME_FORMAT(TIMESTAMPADD(hour, 1, __time), 'YYYYMM')) next_month_arr
   from hits_monthly 
  where __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
  group by entity, user
)
select entity, curr_month, sum(case when array_contains(next_month_arr, curr_month) then 1 else 0 end)*1.0/count(*)
  from tmp, unnest(curr_month_arr) as foo(curr_month)
 group by entity, curr_month

I don't know if the syntax is typo-proof ... I just tried the equivalent on the wikipedia demo dataset  and it worked for me.

Note -- in order to get the array stuff and unnest() to work, you will probably have to be on the latest build and will have to set your query context to:

{
  "enableUnnest": true
}


I haven't tried the MV equivalents of this but they may work. as well (although I don't think unnest() operates on MVs)

Good luck ... let me know if you can get this to work.

-- John

AR

unread,
May 4, 2023, 2:59:41 AM5/4/23
to Druid User
Hi John,
I tried the query with the cartesian join and was able to get the computation time for 6 months down to 30 sec. 
Query:
with userETerm as
(
SELECT
      user
    ,ARRAY_AGG(distinct concat(entity, '~', TIME_FORMAT(__time, 'yyyyMM')), 10240) as et
FROM
    hits_monthly
WHERE
    __time between time_parse('20220401', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
    and entity is not null
group by 1
),
eTerm as
(
SELECT
  DISTINCT entity, TIME_FORMAT(__time, 'yyyyMM') curr, TIME_FORMAT(TIMESTAMPADD(month, -1, __time), 'yyyyMM') prv
FROM
    hits_monthly
WHERE
    __time between time_parse('20220401', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
    and entity is not null
)
SELECT
  entity,
  curr,
  (sum(case when MV_CONTAINS(userETerm.et, concat(eTerm.entity, '~', eTerm.prv))
    and MV_CONTAINS(userETerm.et, concat(eTerm.entity, '~', eTerm.curr)) then 1 else 0 end)*1.0 /
    sum(case when MV_CONTAINS(userETerm.et, concat(eTerm.entity, '~', eTerm.curr)) then 1 else 0 end)) stickiness
from eTerm join userETerm on  1=1
group by 1, 2

We are running v24.0.2. Looks like "unnest" is supported in this version. The below sample query threw exception.
Query: select unnest(array['abc', 'pqr'])
Context:
{
  "enableUnnest": true
}

Result: 
Error: SQL parse failed
Encountered "unnest" at line 1, column 8

Let me try to adapt your suggestion without "unnest". 

Many thanks for all your suggestions and tips.

Thanks,
AR.

John Kowtko

unread,
May 4, 2023, 8:30:15 AM5/4/23
to Druid User
Try

select * from unnest(array['abc', 'pqr'])

The syntax is a bit awkward ... the unnest() expression acts like a table so cannot be directly in the select list, I put it in the FROM clause and treat it like a subquery.  

You can also give it table and column aliases, e.g:

select foo.bar from unnest(array['abc', 'pqr']) as foo(bar)

The cross join syntax supported appears to be only the old style SQL join syntax (i.e. comma separated tables), e.g.:

select foo.bar, w.* from wikipedia w, unnest(array['abc', 'pqr']) as foo(bar)

Also to mention in your query above you can try the "Cross join" syntax and leave out the 1=1 predicate ... that should be working (even if it wasn't documented:

... from eTerm CROSS join userETerm

So did you try my version of the query?  I have it down to two aggregates and one unnest() operation ... I know unnest() is basically like a cartesian join, but I am curious to see how the performance compares to an explicit join ... and it doesn't need the "select distinct months" CTE that you had to explicitly use.

-- John

AR

unread,
May 4, 2023, 11:39:20 PM5/4/23
to Druid User
Hi John,
Looks like "unnest" is not supported in v24.0.2.

Query: select * from unnest(array['abc', 'pqr'])
Query Context:
{
  "enableUnnest": true
}
Result:
Error: SQL query is unsupported
Query not supported. Please check Broker logs for additional details. SQL was: select * from unnest(array['abc', 'pqr'])
org.apache.calcite.plan.RelOptPlanner$CannotPlanException

Thanks,
AR.

John Kowtko

unread,
May 9, 2023, 8:58:21 AM5/9/23
to Druid User
Hi AR,

So I tried to get this working with MVs instead of Arrays, MV has implicit unnesting if included in the Group By clause ... however it doesn't appear to unnest the MV until the actual group by is performed ... so the contains() clause always appears to evaluate to false ... I assume it is trying to check if the entire curr_hr MV is within next_hr, which it never is.   I tried a variation of this by unnesting curr_hr in an intermediate CTE, but no luck, it combines it back the MV for the final select.  

Here is where I'm at with this, if you want to play around with it.  It does require MSQ SQL, the native SQL engine throws an error.  

with tmp_hits as (
 select entity, 
        user,
        string_to_mv(string_agg( distinct TIME_FORMAT(__time, 'YYYYMM'),','),',') curr_month_arr,
        string_to_mv(string_agg( distinct TIME_FORMAT(TIMESTAMPADD(hour, 1, __time), 'YYYYMM'),','),',') next_month_arr
   from hits_monthly 
  where __time between time_parse('20221001', 'yyyyMMdd') and time_parse('20230401', 'yyyyMMdd')
  group by entity, user
)
select entity, curr_month_arr, sum(case when mv_contains(next_month_arr, curr_month_arr) then 1 else 0 end)*1.0/count(*)
  from tmp
 group by entity, curr_month_arr

My recommendation though is to move to the latest version of Druid and use the explicit unnest with Arrays ... that is the direction the product is headed in and I am thinking this may be faster than the original approach that includes the join.

--  John

Reply all
Reply to author
Forward
0 new messages