Druid reindex not rolling up data as expected

180 views
Skip to first unread message

Dejan Zegarac

unread,
Feb 26, 2024, 5:50:24 AM2/26/24
to Druid User
Hello

I have example data below

Screenshot 2024-02-26 114056.png

If I do the compaction per following spec (it's in PHP, but it should be pretty easy to understand)

$task = $client->prepareReindex('rtest')->then(
function (IndexTaskBuilder $builder) {
    return $builder->interval('2024-02-25T00:00:00.000Z/2024-04-26T00:00:00.000Z')
    ->segmentGranularity(Granularity::DAY)
    ->queryGranularity(Granularity::HOUR)
    ->rollup()
    ->transform(function (TransformBuilder $builder) {
        $builder->transform("concat('EXPIRED', '')", 'destination');
    })
    ->execute();
});

So basically I execute a reindex on granularity DAY and query granularity HOUR. I enable rollup and transform destination as EXPIRED. After completed, I get the following

Screenshot 2024-02-26 114235.png

As seen above, it's pretty clear that the first 3 entries should be compacted, for example, as count 3, sum_seizures 50, but for some reason, this is not the case... May I ask why?

Dejan Zegarac

unread,
Feb 26, 2024, 5:58:06 AM2/26/24
to Druid User
It is also worth mentioning that if all entries have sum_seizures 10, so if I have table like this

Screenshot 2024-02-26 115533.png

And if I run compaction on this, I would get result as expected:

Screenshot 2024-02-26 115656.png

From what I see, druid for some reason expects both dimensions and metrics to be equal in order to perform rollup, which is definitely how it should not work...

John Kowtko

unread,
Feb 26, 2024, 7:19:15 AM2/26/24
to Druid User
Hi Dejan,

Rollup should execute the equivalent of a Group By on __time and all Dimension fields.   If it is not doing this then my first suspicion is that you are somehow including the measures as dimension in your rollup operation.

Can you try running this as a native Druid reindex or compaction job, to confirm if you are getting same results as when you submit from PHP?   I do not see a Dimension spec in the PHP command, so is it possible that PHP is just considering all fields in the datasource as dimensions?

Thanks.  John

Dejan Zegarac

unread,
Feb 26, 2024, 8:21:42 AM2/26/24
to Druid User
Hi John,

I assumed Druid to already know, if I don't specify any dimensions or metrics, to automatically understand what are the dimensions, what are metrics and to simply rollup the existing data based on the new queryGranularity... I see no reason for it to require such parameters to be specified in this query as it already has info about the spec of specific dataset...

I updated this example task to the following and now it works :) Not sure if this is the best approach, please let me know if you believe it can be done in a better way. This is now a little inconvenient as I need to specify in the config of my app aggregation methods for all measures and update those accordingly every time I update the ingestion spec.

$task = $client->prepareReindex('rtest')->then(
function (IndexTaskBuilder $builder) {
    return $builder->interval('2024-02-25T00:00:00.000Z/2024-04-26T00:00:00.000Z')
        ->segmentGranularity(Granularity::DAY)
        ->queryGranularity(Granularity::HOUR)
        ->dimension('group')
        ->dimension('routeid')
        ->dimension('destination')
        ->longSum('sum_seizures')
        ->longSum('count')

        ->rollup()
        ->transform(function (TransformBuilder $builder) {
            $builder->transform("concat('EXPIRED', '')", 'destination');
        })
        ->execute();
});

Thank you very much for your reply and help. I truly appreciate it. Without you our transition to druid would be much harder if not impossible :)

Best Regards, Dejan

John Kowtko

unread,
Feb 26, 2024, 8:42:58 AM2/26/24
to Druid User
Hi Dejan,

Yeah, since rollups have "never" had an issue in the 18 months I've been working with the product, the first suspicion in a case like this is that it's not grouping on the right fields.

There is no global schema stored in Druid, each segment is on its own, and it is the union of all of the segment metadata that the Broker sees as the schema.   

I don't know that the segment has an explicit schema ... however when you run a standard compaction job, the first pass of the job is to open up all segments and inspect the metadata.  Then the job outputs a "revised" compaction spec ... that spec now includes the dimensionsSpec. So it must have a way of determining what the dimensions are.

Possibly PHP tries to interpret the schema and does not do it properly?  Or maybe it assumes that if you don't explicitly state the dimensions then it is going to assume that all fields are a dimension, in order to avoid accidental data loss?

If there is doc on the PHP interface I would be interested in hearing if it mentions anything like this.

Thanks.  John

Dejan Zegarac

unread,
Feb 26, 2024, 9:24:08 AM2/26/24
to Druid User
Hi John,

The library we use is a modified version of Level23's Druid Client, which unfortunately doesn't have much documentation in it's interface.. The only modification we performed is making it async.

If i do something simple like

$client->prepareReindex('rtest')->then(
    function (IndexTaskBuilder $builder) {
        $builder = $builder->interval('2024-02-25T00:00:00.000Z/2024-04-26T00:00:00.000Z')
            ->segmentGranularity(Granularity::DAY)
            ->queryGranularity(Granularity::HOUR)
            ->rollup();

        $arr = $builder->toArray();

        echo json_encode($arr, JSON_PRETTY_PRINT) . "\n";
    }
);

This is the query it formats and sends to druid.

{
    "type": "index",
    "spec": {
        "dataSchema": {
            "dataSource": "rtest",
            "timestampSpec": {
                "column": "__time",
                "format": "auto"
            },
            "dimensionsSpec": {
                "dimensions": [],
                "spatialDimensions": []
            },
            "metricsSpec": [],
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "day",
                "queryGranularity": "hour",
                "rollup": true,
                "intervals": [
                    "2024-02-25T00:00:00.000Z\/2024-04-26T00:00:00.000Z"
                ]
            },
            "transformSpec": null
        },
        "ioConfig": {
            "type": "index",
            "inputSource": {
                "type": "druid",
                "dataSource": "rtest",
                "interval": "2024-02-25T00:00:00.000Z\/2024-04-26T00:00:00.000Z"
            },
            "inputFormat": null,
            "appendToExisting": false
        }
    }
}

To me it looks as expected. However the query only modifies the granularity, but never does it actually execute rollup. Result:

no-rollup.png

It is obvious that first few rows should be rolled up but they aren't.

Of course, what I did in my last reply solved it. So doing something like:

$task = $client->prepareReindex('rtest')->then(
    function (IndexTaskBuilder $builder) {
        return $builder->interval('2024-02-25T00:00:00.000Z/2024-04-26T00:00:00.000Z')
            ->segmentGranularity(Granularity::DAY)
            ->queryGranularity(Granularity::HOUR)
            ->dimension('group')
            ->dimension('routeid')
            ->dimension('destination')
            ->longSum('sum_seizures')
            ->longSum('count')
            ->rollup()
            ->transform(function (TransformBuilder $builder) {
                $builder->transform("concat('EXPIRED', '')", 'destination');
            })
            ->execute();
    }
);

Yields the expected result. If I only specify dimensions though, and not specify any metrics (so same as above, but just without longSums), it formats the following query:

{
    "type": "index",
    "spec": {
        "dataSchema": {
            "dataSource": "rtest",
            "timestampSpec": {
                "column": "__time",
                "format": "auto"
            },
            "dimensionsSpec": {
                "dimensions": [
                    {
                        "name": "group",
                        "type": "string"
                    },
                    {
                        "name": "routeid",
                        "type": "string"
                    },
                    {
                        "name": "destination",
                        "type": "string"
                    }
                ],
                "spatialDimensions": []
            },
            "metricsSpec": [],
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "day",
                "queryGranularity": "hour",
                "rollup": true,
                "intervals": [
                    "2024-02-25T00:00:00.000Z\/2024-04-26T00:00:00.000Z"
                ]
            },
            "transformSpec": {
                "transforms": [
                    {
                        "type": "expression",
                        "name": "destination",
                        "expression": "concat('EXPIRED', '')"
                    }
                ]
            }
        },
        "ioConfig": {
            "type": "index",
            "inputSource": {
                "type": "druid",
                "dataSource": "rtest",
                "interval": "2024-02-25T00:00:00.000Z\/2024-04-26T00:00:00.000Z"
            },
            "inputFormat": null,
            "appendToExisting": false
        }
    }
}

Unfortunately, executing this query makes all metrics 'null'.

Please, if you can, let me know if this is all expected behavior, or if not, should I need to change something :)

Thanks, Dejan

Chris Chanyi

unread,
Feb 26, 2024, 9:32:31 AM2/26/24
to druid...@googlegroups.com
I don’t know that library either, but you can see it is running an index job, not a compact job. A compact job will actually inspect the druid segments before compacting. If you leave the dimension, metric, and granularity, it will build them from the existing segments first before submitting the actual compaction tasks.

If you look at the docs for that library they do offer a compact task:

// Build our compact task.
$taskId = $client->compact('wikipedia')
    ->interval('2015-09-12T00:00:00.000Z/2015-09-13T00:00:00.000Z ')
    ->segmentGranularity(Granularity::DAY) 
    ->execute([ 'skipIntervalValidation' => true ]); // Ignore interval validation. 

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/c3d60855-c287-473e-9d6e-a9db330b845fn%40googlegroups.com.
<no-rollup.png>

Dejan Zegarac

unread,
Feb 26, 2024, 9:53:08 AM2/26/24
to Druid User
Hi,

Yes, but I need to set a few columns as 'EXPIRED' before proceeding with compaction and rollup. The reason being is that I have some very very specific dimensions such as some ranges and ML hashes.

Those info should be available only for 24 hours and further those fields must be set as EXPIRED, so I can get rid of very unique dimensions and rollup as much data as possible.

I found out that reindex seems to be the best function for that. Do you believe I can achieve that with compact job somehow? Because I wasn't able to find a way for that via compact.

Regards,
Dejan

John Kowtko

unread,
Feb 26, 2024, 9:58:26 AM2/26/24
to Druid User
If compaction is the only job type that inspects the segments, and reindex does not ...then if you use reindex you can also use MSQ and do this via a SQL REPLACE/OVERWRITE statement, which may be a lot easier.  then it's mainly a Select statement with  Group By, you can do whatever you want for grouping and aggregate items, use SQL expressions and functions to transform the data ... then insert/replace back into the same time interval.     

I don't know of PHP can access that endpoint though ... 


-- John

Chris Chanyi

unread,
Feb 26, 2024, 10:01:00 AM2/26/24
to druid...@googlegroups.com
Depends on the version of Druid. There is a transform spec in compaction now that behaves similar to what I think your original spec shows. I’m not positive what version it is supported with 25 or 28?? 

Note, not sure how big your cluster will be. But, when running a compaction job, the portion that “discovers” the dimension and metric spec (if left empty) is a single process that runs and downloads all the segments to the middle manager and then opens them to get the meta-data. It performs a merge and generates the specs. I find this step is slow as it is a single process. You can speed it up, but breaking up the granularity. But, you may find you have more control just building the specs and including them easier and faster in the end.

Reply all
Reply to author
Forward
0 new messages