dealing with heavily skewed data sets

473 views
Skip to first unread message

Ben Linsay

unread,
Apr 14, 2012, 2:13:04 AM4/14/12
to cascadi...@googlegroups.com
hi all,

what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but  this seems not to trigger it.

i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.

relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.

--- stack trace ---

cascading.flow.FlowException: internal error during reducer execution
        at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122)
        at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527)
...
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.Long.valueOf(Long.java:557)
        at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:93)
        at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
        at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparator.java:74)
        at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparator.java:32)
        at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:73)
        at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingTupleElementComparator.java:33)
        at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerComparator.java:145)
        at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingComparator.java:59)
        at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
        at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136)
        at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
... (many more) ...

Ken Krugler

unread,
Apr 14, 2012, 11:05:36 AM4/14/12
to cascadi...@googlegroups.com
From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a  group.

This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.

So maybe it's a 2.0 issue?

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr




Ben Linsay

unread,
Apr 14, 2012, 1:24:41 PM4/14/12
to cascadi...@googlegroups.com
unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
To post to this group, send email to cascading-user@googlegroups.com.
To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Apr 14, 2012, 1:40:46 PM4/14/12
to cascadi...@googlegroups.com
why not group on the id instead of stuffing all the values in a Tuple, then use a Buffer to work against them.


To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ben Linsay

unread,
Apr 14, 2012, 4:09:36 PM4/14/12
to cascadi...@googlegroups.com
yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.

thanks for pointing me in the right direction.

Maxime Brugidou

unread,
Apr 15, 2012, 6:09:06 PM4/15/12
to cascadi...@googlegroups.com
I also like the Hive way of dealing with skewed data on the reduce side (for aggregations). They shuffle keys in the first M/R job to do a non-skewed pre-aggregation and they add an extra M/R job for the final aggregation. (cf https://issues.apache.org/jira/browse/HIVE-964)

Is that possible in Cascading 2.0? For heavily skewed data it's totally worth having an extra job.

To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/mdBisAgRNocJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

Chris K Wensel

unread,
Apr 15, 2012, 6:58:16 PM4/15/12
to cascadi...@googlegroups.com
From what I can tell, that feature is for joins, which is something we should probably look into if I can sort out what its actually doing.

As for aggregations in a GroupBy, see the AggregateBy sub-assembly (and its sub-types). This will allow for partial aggregations map side. Which is similar to what you described but in a single MR job.


ckw

Maxime Brugidou

unread,
Apr 15, 2012, 7:23:08 PM4/15/12
to cascadi...@googlegroups.com
I totally mixed-up the two operations (aggregation & join), the additional M/R jobs are for skewed joins and map-side pre-aggregation is for skewed aggregations :) Sorry about that.

Robin Kraft

unread,
Apr 16, 2012, 5:10:05 PM4/16/12
to cascadi...@googlegroups.com
Hey Ben,

I just wanted to let you know that you're not alone! I've seen similar heap errors using extremely skewed datasets and Cascalog, joining a set of 50 tuples with a set of 100s of thousands or a few million tuples.

In Cascalog our solution was to create a Clojure map in a separate, very quick map-side job. After that it could be passed into the custom function, where keyword lookups produced the same result as a join. It's much, much faster and stabler. I've never used Cascading directly, but since it works in Cascalog I assume there's an analogue in pure Cascading.

Here's what you'd do in Cascalog (skipping the initial map job that would otherwise create "other-vals").

(def src
"Our input data"
[[1 20]
 [2 30]])

(defn join-it
  "Join actually happens here with keyword lookup"
  [id other-vals]
  (other-vals (keyword (str id))))

;; the query
(let [other-vals {:1 10 :2 25}]
  (??<- [?id ?val ?val2]
    (src ?id ?val)
    (join-it ?id other-vals :> ?val2)))

Result:
1 20 10
2 30 25

-Robin

Chris K Wensel

unread,
Apr 16, 2012, 5:31:25 PM4/16/12
to cascadi...@googlegroups.com
you should't be seeing OOME when doing a CoGroup unless you have "cascading.spill.threshold" set to an unreasonably large number (or you are stuff nearly unreasonable amounts of data into a single Tuple), and you had ordered your tuple streams incorrectly in the CoGroup (large to small)

also, Join (likely renamed to HashJoin) is an effective map side join, and in the case of an inner-join, an effective filter, which would be the best practice if one side fits in memory (assuming you order your tuple streams from large to small, as in CoGroup) and you won't be performing an Aggregation afterwards.

ckw

To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t1OKvoEF_W4J.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Apr 16, 2012, 5:34:40 PM4/16/12
to cascadi...@googlegroups.com
to be clearer, CoGroup and Join work best when you order your tuple streams from large to small... 

when in the case of CoGroup, large and small mean num values per unique key.

and in the case of Join, large and small means doesn't and does fit in memory, respectively

ckw

Oscar Boykin

unread,
Apr 16, 2012, 5:42:50 PM4/16/12
to cascadi...@googlegroups.com
We implemented a replicated join on top of CoGroup for scalding:

https://github.com/twitter/scalding/blob/master/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala#L161

It helps dealing with skewed data.

Right now, the replication is constant across all keys, but a smarter approach is to only replicate the keys with a lot of values (we have this on our roadmap: do a sampled cogroup + count, compute replication from count, then Join the replication to the non-sampled streams and apply the block join algorithm).

--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

> > > > > > > > To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> > > > > > > > To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).


> > > > > > > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> > > > > > >
> > > > > > >
> > > > > > > --------------------------
> > > > > > > Ken Krugler

> > > > > > > http://www.scaleunlimited.com (http://www.scaleunlimited.com/)


> > > > > > > custom big data solutions & training
> > > > > > > Hadoop, Cascading, Mahout & Solr
> > > > > >
> > > > > >
> > > > > > --
> > > > > > You received this message because you are subscribed to the Google Groups "cascading-user" group.
> > > > > > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ.

> > > > > > To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> > > > > > To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).


> > > > > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> > > > >
> > > > >
> > > > > --
> > > > > Chris K Wensel

> > > > > ch...@concurrentinc.com (mailto:ch...@concurrentinc.com)
> > > > > http://concurrentinc.com (http://concurrentinc.com/)


> > > >
> > >
> > >
> > > --
> > > You received this message because you are subscribed to the Google Groups "cascading-user" group.
> > > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t1OKvoEF_W4J.

> > > To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> > > To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).


> > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> >
> >
> > --
> > Chris K Wensel

> > ch...@concurrentinc.com (mailto:ch...@concurrentinc.com)
> > http://concurrentinc.com (http://concurrentinc.com/)


> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > You received this message because you are subscribed to the Google Groups "cascading-user" group.

> > To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> > To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).


> > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>
>
> --
> Chris K Wensel

> ch...@concurrentinc.com (mailto:ch...@concurrentinc.com)


> http://concurrentinc.com
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.

> To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).

Reply all
Reply to author
Forward
0 new messages