Determining quantiles (across ordered data) using Cascading

753 views
Skip to first unread message

Tim James

unread,
Jul 9, 2012, 9:17:19 PM7/9/12
to cascadi...@googlegroups.com
Howdy,

As part of a larger data flow, we need to assign tuples to quantiles (http://en.wikipedia.org/wiki/Quantile) by a field common to each.  Googling around has not helped yet.  I'm not sure whether Cascading is a good fit for this, though it seems to be good for parts of it (and so far it's great for everything else we've needed in this part of our data world).  Help?  I'll explain further...

We need, for each of a set of "quantile boundaries" (e.g., 0.25, 0.5, 0.75 for quartiles), a corresponding upper-bound data value (what we need).  I'm used to finding this by iterating through the data, ordered by the field in question, counting an index as we go, and taking the field value at each point where the index (count of values past which we've iterated) passes a boundary as a proportion of the total count of values/tuples/records.  So for quartiles, e.g., we take the value at the 250th, 500th, and 750th position in the ordered set of values.  So we need to go through all the items, in order, in the context of the count of items (it's not enough to determine the count by the end as would be sufficient for computing an average).  OR we can use a sample of the data, but we still need to know the size of the sample.

The rub is in the partitioning of a large data set for distributed process (in Cascading/Hadoop/MR): while any partition, if formed as a (random and representative) sample of the data, can be used in isolation to get the result (all partitions should get nearly the same results, at least for the kind of data distributions we see), we'd need to know the partition size.  So, in cascading, while we can use CountBy (and an empty Fields) to count the entire data set, and we could use a simple custom Aggregator or Buffer to iterate through the ordered data (yes?), each application thereof is only to a partition of the data (right?) in each mapper/reducer, but the only way I see to get the partition size (as opposed to the size of the whole data set) is to iterate (either through a single Buffer.operate or many Aggregator.aggregate calls, since only thereby can I keep state (yes?)) through the data in the partition.  It seems I need to iterate through any given partition twice: once to count, then again to emit value-at-quantile-boundary tuples.  But I don't know of any way to guarantee that the partitions won't be rehash/reshuffled/resized, or to "replay" a partition or "reset the iterator".  

In general, Cascading has precipitated elegant solutions to our problems (the way AggregateBy allows for AverageBy), and this one isn't falling out elegantly.  We are confused.  What are people doing to handle what I imagine is a really common need here?  What's the right way to see this in Cascading?

BTW: I know I can get it to work in local mode for small data sets because there's only one partition, but I don't want to find I've done it wrong when the data set is real and large.

Thank you for considering this.

Tim

Philippe Laflamme

unread,
Jul 9, 2012, 11:43:18 PM7/9/12
to cascadi...@googlegroups.com
Hi,

Not sure how great a solution this is, but the idea would be to reduce the problem by iteratively binning your values until you reach your quantiles.

First, assign a group number to each value. This simply allows splitting your dataset into groups to allow distributed processing. For example, insert an integer value to your tuples that cycles from 1 to 5 (there's probably a more efficient way to do this).

Second, you need to compute min and max (unless you already know them), so you GroupBy on this new bogus value and spit out min,max tuples. Then, use an Each to compute the actual min and max values.

Third, you split the range (min,max) into bins. Then, you GroupBy your grouping field again, but this time counting the frequency of numbers in each bin. Tuples look like so: group_number,upper_bound,frequency. You GroupBy "upper_bound" and sum frequencies to get the actual frequency of each bin.

This gives you a list of grouped values that you can iterate over just like you'd do with the actual values (summing frequencies instead of counting). At this point you need to see all bins, ordered by their upper bound value. So reducing the number of bins will help, but will require that you iterate more times over the values.

Once you've identified the bins you're interested in, you repeat the process using the new min-max ranges of each "interesting" bin to create your new bins. You repeat until you get bins with a frequency of 1 in the ones you're interested in.

This is an iterative algorithm, so you have to create Flows until you're done. Not sure how good an idea that is... Probably the experts on the list will provide an easier, more efficient way.

Hope that helps,
Philippe


Tim

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/m4dZHWEPdnkJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
Jul 9, 2012, 11:43:26 PM7/9/12
to cascadi...@googlegroups.com
Hi Tim,

Back in May the issue of computing a standard deviation came up.

Ted Dunning pointed to some code in Mahout that does an on-line computation for a number of statistics, which includes quartiles.

I did a quick port of some of that code to a Cascading Aggregator, and posted a link.

See below for details - Ted also responded with how he'd handle making this more efficient via AggregateBy, so you should find that thread on the list.

-- Ken

Some of squares is a poor way to compute standard deviation.

See http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#On-line_algorithm for an explanation of Welford's method for doing this.  This method is available in the Apache Mahout class OnlineSummarizer:


The issue is that you may very easily find yourself subtracting large numbers (squared).  This gives very poor accuracy and can even lose all significant bits of the answer including the sign bit.

FWIW, here's a StdDeviation Aggregator that uses code from OnlineSummarizer.


Some caveats, of course...

- It's not an AggregateBy, just a regular Aggregator.

Not sure how you'd merge in the output of two map-side results that had switched over to incremental mode.

And if you didn't have 100 results map-side, I assume you'd flush that work and have to re-process in the reducer.

But Ted would know better.

 - I'm only calculating std deviation

You could easily change it to be a base class Statistics class that other classes are based on: StdDeviation, Mean, Median, Quartiles, etc.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr




Ted Dunning

unread,
Jul 10, 2012, 2:07:29 AM7/10/12
to cascadi...@googlegroups.com, cascadi...@googlegroups.com


Sent from my iPhone

On Jul 9, 2012, at 8:43 PM, Ken Krugler <kkrugle...@transpac.com> wrote:

>
>
> Back in May the issue of computing a standard deviation came up.
>
> Ted Dunning pointed to some code in Mahout that does an on-line computation for a number of statistics, which includes quartiles.
>

This is probably still the right answer.

The problem of partitioning each maps inputs into quartiles independently is generally insoluble if you really want to partition on global quartiles.

If you don't mind an early pass over some or all of the data then you can estimate the quartiles on a sample and the use those to partition the data.

The mahout code will only work on data in one place. If you want to do better you can do the equivalent of a single pass of one dimensional k-means to get a small surrogate of the distribution of your values. These surrogate distributions could be pretty accurately reconstruct the global quartiles. There is no code available for this that I know about.

You could take this one step further and do online partitioning of your data into many partitions per mapper. These partitions could be arranged into a global approximate partition in many pieces. This last approach would be subject to error with pathological data such as presorted data. There is also no code to do this now.

My own suggestion is to randomly downsample to 10k values per mapper and then compute the quartiles in the reducer. Then I would use a second pass to partition the data.

Ted Dunning

unread,
Jul 10, 2012, 2:08:58 AM7/10/12
to cascadi...@googlegroups.com, cascadi...@googlegroups.com
Iteration is evil with hadoop.

This can be done in two passes.

Sent from my iPhone

Tim James

unread,
Jul 10, 2012, 6:41:18 PM7/10/12
to cascadi...@googlegroups.com
Thank you Ken, Ted, Philippe.  Very helpful!  We'll be looking at all of this and I'll let you know what we come up with.  

I've looked now at the code in
and we may be able to modify it for our needs (quartiles are insufficient), or we may end up with some simpler sampling approach.

Tim

Bertrand Dechoux

unread,
Jul 11, 2012, 5:19:31 AM7/11/12
to cascading-user
Actually, simply using MapReduce, one can compute percentiles with a
single MapReduce.

Let's say we want to have the 50th percentile of download par day for
a list of product.
Each product is identified by its productId. And we have a file with
entries such as (productId,nDayDownloads).
0) trick : emit the count as a unreachable nDayDownloads, exemple : -1
1) combine in order to reduce the size : you basically want to get the
data in the form of a histogram (ie not a suite of nDayDownloads but a
mapping of how many times nDayDownloads there was for a product).
1) group by productI
2) secondary sort by nDayDownloads
3) With the first value(s) of each group, you can know the size of the
group
4) you then iterate (only once) and pick the percentile where you want

I did in plain Java and would be interested to do it in Cascading, but
I haven't had the time so far.
Caveat : it is indeed a huge sort. Reducing the histogram width (eg :
well.. finally, I am only interested in thousands of downloads per
day, you can truncate the number) or sampling are way to improve
performance but you will lose in precision. I would like to say that
the precision loss does not matter but it depends why your are looking
for percentile ie for contractual reasons, a estimated percentile
might be irrelevant.

Bertrand

On 11 juil, 00:41, Tim James <tja...@change.org> wrote:
> Thank you Ken, Ted, Philippe.  Very helpful!  We'll be looking at all of
> this and I'll let you know what we come up with.
>
> I've looked now at the code inhttps://github.com/bixolabs/cascading.utils/blob/master/src/main/java...
> and we may be able to modify it for our needs (quartiles are insufficient),
> or we may end up with some simpler sampling approach.
>
> Tim
>
>
>
>
>
>
>
> On Monday, July 9, 2012 11:07:29 PM UTC-7, Ted Dunning wrote:
>
> > Sent from my iPhone
>
> > On Jul 9, 2012, at 8:43 PM, Ken Krugler <kkrugler_li...@transpac.com>

Ted Dunning

unread,
Jul 11, 2012, 10:24:30 AM7/11/12
to cascadi...@googlegroups.com, cascading-user
Yes. You can find the quartiles in a single pass and there are definitely more efficient ways than sorting all the data. The original problem was about binning the data in a single pass. That is only possible approximately.

It is possible to make the second pass cheaper but I think it is impossible to eliminate it.

Sent from my iPhone
> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.

Philippe Laflamme

unread,
Jul 11, 2012, 11:00:08 AM7/11/12
to cascadi...@googlegroups.com
Yes. You can find the quartiles in a single pass and there are definitely more efficient ways than sorting all the data. The original problem was about binning the data in a single pass. That is only possible approximately.

Just so I understand clearly. Finding quartiles in a single pass without sorting data will provide a good approximation, but not exact values, right? Which is probably sufficient for most applications, but I just want to make sure I'm not missing something.

It is possible to make the second pass cheaper but I think it is impossible to eliminate it.

If you are referring to the binning method, indeed, the second pass is probably unavoidable. That said, if you use 1000 bins and you're interested in 3 of those bins (for quartiles) in each iteration, then subsequent passes require n / 1000 * 3 values (where n is the number of values in the previous pass). It reduces the data pretty quickly, but maybe that's not fast enough for most applications. I don't know, I don't deal with petabytes of data :( 

Cheers,
Philippe

Ted Dunning

unread,
Jul 11, 2012, 11:42:42 AM7/11/12
to cascadi...@googlegroups.com
On Wed, Jul 11, 2012 at 8:00 AM, Philippe Laflamme <philippe...@gmail.com> wrote:
Yes. You can find the quartiles in a single pass and there are definitely more efficient ways than sorting all the data. The original problem was about binning the data in a single pass. That is only possible approximately.

Just so I understand clearly. Finding quartiles in a single pass without sorting data will provide a good approximation, but not exact values, right? Which is probably sufficient for most applications, but I just want to make sure I'm not missing something.

There are two questions here.  One is what are the quantiles of the underlying distribution from which the current data are drawn.  The other question is what the quantiles of the observed data is.

Online methods presented with data in randomized order can approximate the first with accuracy as good as is possible.  Online methods will only approximate the second measure.

It should be kept in mind that the exact quantiles of observed data are only an approximation of the quantiles of the underlying distribution.

You are correct that there are cases where the exact quantiles of the observed data are important and online algorithms don't give you that.
 

It is possible to make the second pass cheaper but I think it is impossible to eliminate it.

If you are referring to the binning method, indeed, the second pass is probably unavoidable. That said, if you use 1000 bins and you're interested in 3 of those bins (for quartiles) in each iteration, then subsequent passes require n / 1000 * 3 values (where n is the number of values in the previous pass). It reduces the data pretty quickly, but maybe that's not fast enough for most applications. I don't know, I don't deal with petabytes of data :( 

Yes.  This is what I meant.  This is good enough for most applications.  Also, since you know how many elements you can do a linear scan to separate out the items as desired.

Paul Baclace

unread,
Jul 23, 2012, 7:44:25 PM7/23/12
to cascadi...@googlegroups.com
I had to calculate the median last year and using
org.apache.mahout.math.stats.OnlineSummarizer and it worked fine
wrapped as a custom Aggregator, just like Ken suggested. Although this
requires a single reducer, it was okay for my case of hundreds of
streams with between 2 and 10^5 values.

The approximation algorithm used for quantile is a simplified variant of
that described in:

http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.105.1580

This paper describes a way to merge the online state from many
partitions (sec. 2.4), so it should be possible to alter the
OnlineSummarizer to emit its intermediate state and then combine those.
(Perhaps a Mahout feature request is needed.)

For similar reasons, algorithms in R need parallel variants, and that is
why "Parallel R" implementations mostly help people use R in their
mappers or reducers, and do not (yet) offer a complete R that
transparently works in parallel.


Paul

Ted Dunning

unread,
Jul 30, 2012, 2:36:39 PM7/30/12
to cascadi...@googlegroups.com
Yeah... you have to watch out for ordered data with that algorithm, but it generally does well.  I haven't seen much pull for the parallelized version.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascading-user@googlegroups.com.
To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages