Re: [storm-user] how to aggregate multiple fields per group in a single persistentAggregate?

429 views
Skip to first unread message

Nathan Marz

unread,
Feb 20, 2013, 12:57:23 AM2/20/13
to storm...@googlegroups.com
That design choice was made because you don't want to force users to have to store lists in the database for every value. I'm open to suggestions on how to tweak the design to be better at the case you described.

On Tue, Feb 19, 2013 at 6:45 PM, Romain Lenglet <lengle...@googlemail.com> wrote:
Hi,

I want to aggregate (sum) multiple fields for each tuple in a group, in a single persistent map.
I had no problem to implement an aggregator that chains CombinerAggregators, i.e. applies each aggregator to a single field of each input tuple.
However, I have no way to use it with a persistentAggregate().
That method can only take aggregators working on a single field.
There is even explicitly a check in MapCombinerAggStateUpdater preventing using it with multiple input fields.
What motivated that arbitrary limitation?

The only way I see to have multiple aggregates for each grouped tuple is to have multiple persistentAggregate(), one per aggregate.
That's very inefficient since it multiples the number of keys in my map, and my keys are relatively large (I group on ~30 fields).

Is there an elegant way for me to use a single persistentAggregate() that chains aggregations of multiple fields?
Will I have to implement my own variant of MapCombinerAggStateUpdater?

Thanks in advance!
Regards,
--
Romain Lenglet

--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Twitter: @nathanmarz
http://nathanmarz.com

Romain Lenglet

unread,
Feb 22, 2013, 5:59:24 PM2/22/13
to storm...@googlegroups.com, nat...@nathanmarz.com
Hi Nathan,

I got around that limitation. I implemented a variant of MapCombinerAggStateUpdater that accepts multiple fields as input and output, executes a separate combiner aggregator for each input field, and stored all updated fields as a single list into the map.
Then, I use it similarly to what persistentAggregate() does, e.g. for summing 2 fields:

stream
    .groupBy(groupFields)
    .chainedAgg()
        .aggregate(new Fields(inputField1), new Sum(), new Fields(outputField1))
        .aggregate(new Fields(inputField2), new Sum(), new Fields(outputField2))
    .chainEnd()
    .partitionPersist(
        new StateSpec(new MyMapFactory<List<Long>>(...)),
        TridentUtils.fieldsUnion(groupFields, new Fields(outputField1, outputField2),
        new MyMapUpdater(..., new Sum(), new Sum()),
TridentUtils.fieldsConcat(groupFields, new Fields(outputField1, outputField2)));

That works well. All the fields are aggregated for each group key in a single get-update-put cycle.

So I'd suggest doing something similar: implementing a variant of MapCombinerAggStateUpdater that stores lists, and a variant of persistentAggregate() to uses that, like my pseudo-code above.

I think that would have value e.g. for users storing multiple aggregate fields in a single SQL table, one value per table.

Regards,
--
Romain Lenglet
Reply all
Reply to author
Forward
0 new messages