Aggregating the sum of distinct values

118 views
Skip to first unread message

Tom Bonner

unread,
Oct 28, 2021, 9:47:27 AM10/28/21
to Druid User
Hi all

Druid's been a really amazing tool, with it's performance being a
game-changer with a project we've been working on. My colleagues and I
have been working however on something that, as always, seemed easy
then got messy and I hoped to ask the community for some help.

Our problem is trying to aggregate over a set of rows to get the sum
of a group of values when they share a n-dimension key, and return
null when they do not.

So with test data like this:

```
{ "id": "1", "city": "london", "name": "FOO", "val": 3, "group": "a" }
{ "id": "2", "city": "london", "name": "FOO", "val": 3, "group": "a" }
{ "id": "3", "city": "london", "name": "FOO", "val": 3, "group": "a" }
{ "id": "4", "city": "london", "name": "BAR", "val": 4, "group": "a" }
{ "id": "5", "city": "london", "name": "BAR", "val": 4, "group": "a" }
{ "id": "6", "city": "london", "name": "BAR", "val": 4, "group": "a" }
{ "id": "7", "city": "london", "name": "BAZ", "val": 5, "group": "a" }
{ "id": "8", "city": "london", "name": "BAZ", "val": 5, "group": "a" }
{ "id": "9", "city": "london", "name": "BAZ", "val": 5, "group": "a" }

{ "id": "10", "city": "london", "name": "FOO", "val": 1, "group": "b" }
{ "id": "11", "city": "london", "name": "FOO", "val": 2, "group": "b" }
{ "id": "12", "city": "london", "name": "FOO", "val": 3, "group": "b" }
{ "id": "13", "city": "london", "name": "BAR", "val": 4, "group": "b" }
{ "id": "14", "city": "london", "name": "BAR", "val": 4, "group": "b" }
{ "id": "15", "city": "london", "name": "BAR", "val": 4, "group": "b" }
{ "id": "16", "city": "london", "name": "BAZ", "val": 5, "group": "b" }
{ "id": "17", "city": "london", "name": "BAZ", "val": 5, "group": "b" }
{ "id": "18", "city": "london", "name": "BAZ", "val": 5, "group": "b" }
```

The result of the grouping over "name" against "val" would be:

```
{ "group": "a", "sum": "12" }
{ "group": "b", "sum": "null" }
```

Given all values of "a" have a unique pairing between "val" and "name"
they can be aggregated correctly. Whilst "b" does not have unique
pairings between "val", so returns null.

I've solved this problem three ways, all with downsides.

First was to write a custom aggregator, which added the unique pairing
of a hash of the reference fields ("name", "city", ect) with "val" and
then finalise at the end by summing. This worked with test data, but
failed when scaled up to MB datasets due to some issues where the
ByteBuffers would get corrupted when buffering. My predecessor wrote
this, so I'm unsure of whether there is an issue in their
implementation of a ByteBufferMap, or whether this would have never
worked. There aren't many examples about storing large lists in the
aggregators.

Second was the following expression:

```
"aggregations": [
    {
      "type": "expression",
      "name": "stringSumUniqueExpression",
      "fields": [
        "val", "name", "city"
      ],
      "initialValue": "[]",
      "fold": "array_set_add(__acc, array_to_string(array(concat(name,
city), val), ','))",
      "combine": "array_set_add_all(__acc, stringSumUniqueExpression)",
      "finalize": "if (array_length(map(a ->
array_offset(string_to_array(a, ','), 0), o)) ==
array_length(array_set_add_all([], map(a ->
array_offset(string_to_array(a, ','), 0), o))), fold((cur, acc) -> acc
+ cast(array_offset(string_to_array(cur, ','), 1), 'DOUBLE'), o, 0.0),
null",
      "maxSizeBytes": 5120,
    },
  ]
```

Which again (reasonably) fails to scale in memory or performance.

And finally my current method of using cardinality in a subquery to
measure uniqueness:

```
SELECT inline_data_segments."group",
    -- If you can find a match, select it
    ANY_VALUE(B.val) as val
from inline_data_segments
    JOIN (
        -- SUM the values which have a uniqueness of 1
        SELECT "group",
            SUM(sumVal) as val
        from (
                -- Create the key, count the distinct values, and
fetch any single value
                select "group",
                    COUNT(DISTINCT val) as valUniqueness,
                    ANY_VALUE(val) as sumVal
                from inline_data_segments
                -- Group by the same things as the parent command, and the key
                GROUP BY "group", CONCAT(name, city)
            )
        GROUP BY "group"
        HAVING MAX(valUniqueness) = 1
    ) as B ON inline_data_segments."group" = B."group"
GROUP BY inline_data_segments."group"
```

The issue here being the large impact on the shape of the original
query. My plan going forward is to make this request in parallel with
the main request for data from Druid, and then merge the results in
our server afterwards, rather than try JOIN it on the original
request.

If anyone has any thoughts on a simpler method than the ones I've
listed then please feel free to get in touch. Otherwise thanks for
your time.

Many thanks

Tom @ Derivitec
Reply all
Reply to author
Forward
0 new messages