what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
--- stack trace ---
cascading.flow.FlowException: internal error during reducer execution at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) ... Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Long.valueOf(Long.java:557) at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) ... (many more) ...
> what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
> i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
> relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
> --- stack trace ---
> cascading.flow.FlowException: internal error during reducer execution > at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) > at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) > ... > Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.lang.Long.valueOf(Long.java:557) > at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) > at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) > at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) > at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) > at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) > at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) > at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) > at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) > at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) > at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) > at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) > ... (many more) ...
> -- > You received this message because you are subscribed to the Google Groups "cascading-user" group. > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
-------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Mahout & Solr
unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote:
> From what I know of the 1.2 code, you can't run out of memory in a > GroupBy/CoGroup just because there are lots of tuples in a group.
> This assumes that you're not doing anything in a custom Buffer or > Aggregator that consumes memory, of course.
> So maybe it's a 2.0 issue?
> -- Ken
> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
> hi all,
> what's the standard approach for dealing with massively skewed data sets > in cascading? I'm currently dealing with a situation where the same reduce > task is throwing an OutOfMemory error during tuple deserialization (stack > trace is below) , which I can only imagine is caused by an extremely large > group of tuples with the same key. i have a trap on this branch of my > stream, but this seems not to trigger it.
> i'm in a situation where I can throw this group away as an outlier, so i'm > going to try using a FirstN aggregator to limit the size of each reduce > group and throw away tuples in excess of what's "reasonable". This seems > reasonable, but also rather kludgy. I'd love a more elegant solution.
> relevant notes: this may not be a software problem, and I just need more > hardware. the data-set is is about 1T, and I'm running on an EMR cluster of > m1.xlarge boxes configured using amazon's memory-intensive bootstrap > action. i'm considering upgrading to cc1.4xlarge boxes.
> --- stack trace ---
> cascading.flow.FlowException: internal error during reducer execution > at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) > at > org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) > ... > Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.lang.Long.valueOf(Long.java:557) > at > cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) > at > cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) > at > cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) > at > cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) > at > cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) > at > cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) > at > cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) > at > cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) > at > org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) > at > org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) > at > org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) > ... (many more) ...
> -- > You received this message because you are subscribed to the Google Groups > "cascading-user" group. > To view this discussion on the web visit > https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to > cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/cascading-user?hl=en.
> -------------------------- > Ken Krugler > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Mahout & Solr
> unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote: > From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a group.
> This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.
> So maybe it's a 2.0 issue?
> -- Ken
> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>> hi all,
>> what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
>> i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
>> relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
>> --- stack trace ---
>> cascading.flow.FlowException: internal error during reducer execution >> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >> ... >> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >> at java.lang.Long.valueOf(Long.java:557) >> at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >> at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >> at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >> at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >> ... (many more) ...
>> -- >> You received this message because you are subscribed to the Google Groups "cascading-user" group. >> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> -------------------------- > Ken Krugler > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Mahout & Solr
> -- > You received this message because you are subscribed to the Google Groups "cascading-user" group. > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.
On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote:
> why not group on the id instead of stuffing all the values in a Tuple, > then use a Buffer to work against them.
> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
> unfortunately I have to concatenate a few fields of each tuple with a > particular ID to generate "bags" of data, so I'm definitely using memory in > my aggregators. no need to blame chris. :P
> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote:
>> From what I know of the 1.2 code, you can't run out of memory in a >> GroupBy/CoGroup just because there are lots of tuples in a group.
>> This assumes that you're not doing anything in a custom Buffer or >> Aggregator that consumes memory, of course.
>> So maybe it's a 2.0 issue?
>> -- Ken
>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>> hi all,
>> what's the standard approach for dealing with massively skewed data sets >> in cascading? I'm currently dealing with a situation where the same reduce >> task is throwing an OutOfMemory error during tuple deserialization (stack >> trace is below) , which I can only imagine is caused by an extremely large >> group of tuples with the same key. i have a trap on this branch of my >> stream, but this seems not to trigger it.
>> i'm in a situation where I can throw this group away as an outlier, so >> i'm going to try using a FirstN aggregator to limit the size of each reduce >> group and throw away tuples in excess of what's "reasonable". This seems >> reasonable, but also rather kludgy. I'd love a more elegant solution.
>> relevant notes: this may not be a software problem, and I just need more >> hardware. the data-set is is about 1T, and I'm running on an EMR cluster of >> m1.xlarge boxes configured using amazon's memory-intensive bootstrap >> action. i'm considering upgrading to cc1.4xlarge boxes.
>> --- stack trace ---
>> cascading.flow.FlowException: internal error during reducer execution >> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >> at >> org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >> ... >> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >> at java.lang.Long.valueOf(Long.java:557) >> at >> cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >> at >> cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >> at >> cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >> at >> cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >> at >> cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >> at >> cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >> at >> cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >> at >> cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >> at >> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >> at >> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >> at >> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >> ... (many more) ...
>> -- >> You received this message because you are subscribed to the Google Groups >> "cascading-user" group. >> To view this discussion on the web visit >> https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to >> cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at >> http://groups.google.com/group/cascading-user?hl=en.
>> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> custom big data solutions & training >> Hadoop, Cascading, Mahout & Solr
> -- > You received this message because you are subscribed to the Google Groups > "cascading-user" group. > To view this discussion on the web visit > https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to > cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/cascading-user?hl=en.
I also like the Hive way of dealing with skewed data on the reduce side (for aggregations). They shuffle keys in the first M/R job to do a non-skewed pre-aggregation and they add an extra M/R job for the final aggregation. (cf https://issues.apache.org/jira/browse/HIVE-964)
Is that possible in Cascading 2.0? For heavily skewed data it's totally worth having an extra job.
On Sat, Apr 14, 2012 at 10:09 PM, Ben Linsay <blin...@gmail.com> wrote: > yeah, i think that's what i'll end up doing. i'm going to end up > serializing my super-nested tuple bags to JSON, so it felt better to have > smaller operations and compose them instead of trying to group and encode > in one shot.
> thanks for pointing me in the right direction.
> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote:
>> why not group on the id instead of stuffing all the values in a Tuple, >> then use a Buffer to work against them.
>> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>> unfortunately I have to concatenate a few fields of each tuple with a >> particular ID to generate "bags" of data, so I'm definitely using memory in >> my aggregators. no need to blame chris. :P
>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote:
>>> From what I know of the 1.2 code, you can't run out of memory in a >>> GroupBy/CoGroup just because there are lots of tuples in a group.
>>> This assumes that you're not doing anything in a custom Buffer or >>> Aggregator that consumes memory, of course.
>>> So maybe it's a 2.0 issue?
>>> -- Ken
>>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>> hi all,
>>> what's the standard approach for dealing with massively skewed data sets >>> in cascading? I'm currently dealing with a situation where the same reduce >>> task is throwing an OutOfMemory error during tuple deserialization (stack >>> trace is below) , which I can only imagine is caused by an extremely large >>> group of tuples with the same key. i have a trap on this branch of my >>> stream, but this seems not to trigger it.
>>> i'm in a situation where I can throw this group away as an outlier, so >>> i'm going to try using a FirstN aggregator to limit the size of each reduce >>> group and throw away tuples in excess of what's "reasonable". This seems >>> reasonable, but also rather kludgy. I'd love a more elegant solution.
>>> relevant notes: this may not be a software problem, and I just need more >>> hardware. the data-set is is about 1T, and I'm running on an EMR cluster of >>> m1.xlarge boxes configured using amazon's memory-intensive bootstrap >>> action. i'm considering upgrading to cc1.4xlarge boxes.
>>> --- stack trace ---
>>> cascading.flow.FlowException: internal error during reducer execution >>> at cascading.flow.hadoop.**FlowReducer.reduce(** >>> FlowReducer.java:122) >>> at org.apache.hadoop.mapred.**ReduceTask.runOldReducer(** >>> ReduceTask.java:527) >>> ... >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at java.lang.Long.valueOf(Long.**java:557) >>> at cascading.tuple.hadoop.**HadoopTupleInputStream.**readType(** >>> HadoopTupleInputStream.java:**93) >>> at cascading.tuple.hadoop.**HadoopTupleInputStream.** >>> getNextElement(**HadoopTupleInputStream.java:**52) >>> at cascading.tuple.hadoop.**TupleElementComparator.**compare(** >>> TupleElementComparator.java:**74) >>> at cascading.tuple.hadoop.**TupleElementComparator.**compare(** >>> TupleElementComparator.java:**32) >>> at cascading.tuple.hadoop.**DelegatingTupleElementComparat** >>> or.compare(**DelegatingTupleElementComparat**or.java:73) >>> at cascading.tuple.hadoop.**DelegatingTupleElementComparat** >>> or.compare(**DelegatingTupleElementComparat**or.java:33) >>> at cascading.tuple.hadoop.**DeserializerComparator.** >>> compareTuples(**DeserializerComparator.java:**145) >>> at cascading.tuple.hadoop.**GroupingSortingComparator.**compare( >>> **GroupingSortingComparator.**java:59) >>> at org.apache.hadoop.mapred.**Merger$MergeQueue.lessThan(** >>> Merger.java:373) >>> at org.apache.hadoop.util.**PriorityQueue.downHeap(** >>> PriorityQueue.java:136) >>> at org.apache.hadoop.util.**PriorityQueue.adjustTop(** >>> PriorityQueue.java:103) >>> ... (many more) ...
>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "cascading-user" group. >>> To view this discussion on the web visit https://groups.google.com/d/** >>> msg/cascading-user/-/**WCZygpwcvhIJ<https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ> >>> . >>> To post to this group, send email to cascading-user@googlegroups.**com<cascading-user@googlegroups.com> >>> . >>> To unsubscribe from this group, send email to >>> cascading-user+unsubscribe@**googlegroups.com<cascading-user+unsubscribe@go oglegroups.com> >>> . >>> For more options, visit this group at http://groups.google.com/** >>> group/cascading-user?hl=en<http://groups.google.com/group/cascading-user?hl=en> >>> .
>>> -------------------------- >>> Ken Krugler >>> http://www.scaleunlimited.com >>> custom big data solutions & training >>> Hadoop, Cascading, Mahout & Solr
>> -- >> You received this message because you are subscribed to the Google Groups >> "cascading-user" group. >> To view this discussion on the web visit https://groups.google.com/d/** >> msg/cascading-user/-/**fL9yHv5Z1WQJ<https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ> >> . >> To post to this group, send email to cascading-user@googlegroups.**com<cascading-user@googlegroups.com> >> . >> To unsubscribe from this group, send email to cascading-user+unsubscribe@ >> **googlegroups.com <cascading-user+unsubscribe@googlegroups.com>. >> For more options, visit this group at http://groups.google.com/** >> group/cascading-user?hl=en<http://groups.google.com/group/cascading-user?hl=en> >> .
> To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to > cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/cascading-user?hl=en.
From what I can tell, that feature is for joins, which is something we should probably look into if I can sort out what its actually doing.
As for aggregations in a GroupBy, see the AggregateBy sub-assembly (and its sub-types). This will allow for partial aggregations map side. Which is similar to what you described but in a single MR job.
> I also like the Hive way of dealing with skewed data on the reduce side (for aggregations). They shuffle keys in the first M/R job to do a non-skewed pre-aggregation and they add an extra M/R job for the final aggregation. (cf https://issues.apache.org/jira/browse/HIVE-964)
> Is that possible in Cascading 2.0? For heavily skewed data it's totally worth having an extra job.
> On Sat, Apr 14, 2012 at 10:09 PM, Ben Linsay <blin...@gmail.com> wrote: > yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.
> thanks for pointing me in the right direction.
> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote: > why not group on the id instead of stuffing all the values in a Tuple, then use a Buffer to work against them.
> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>> unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote: >> From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a group.
>> This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.
>> So maybe it's a 2.0 issue?
>> -- Ken
>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>> hi all,
>>> what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
>>> i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
>>> relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
>>> --- stack trace ---
>>> cascading.flow.FlowException: internal error during reducer execution >>> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >>> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >>> ... >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at java.lang.Long.valueOf(Long.java:557) >>> at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >>> at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >>> at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >>> at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >>> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >>> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >>> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >>> ... (many more) ...
>>> -- >>> You received this message because you are subscribed to the Google Groups "cascading-user" group. >>> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >>> To post to this group, send email to cascading-user@googlegroups.com. >>> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >>> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> custom big data solutions & training >> Hadoop, Cascading, Mahout & Solr
>> -- >> You received this message because you are subscribed to the Google Groups "cascading-user" group. >> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> -- > You received this message because you are subscribed to the Google Groups "cascading-user" group. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
I totally mixed-up the two operations (aggregation & join), the additional M/R jobs are for skewed joins and map-side pre-aggregation is for skewed aggregations :) Sorry about that.
On Mon, Apr 16, 2012 at 12:58 AM, Chris K Wensel <ch...@wensel.net> wrote:
> From what I can tell, that feature is for joins, which is something we > should probably look into if I can sort out what its actually doing.
> As for aggregations in a GroupBy, see the AggregateBy sub-assembly (and > its sub-types). This will allow for partial aggregations map side. Which is > similar to what you described but in a single MR job.
> On Apr 15, 2012, at 3:09 PM, Maxime Brugidou wrote:
> I also like the Hive way of dealing with skewed data on the reduce side > (for aggregations). They shuffle keys in the first M/R job to do a > non-skewed pre-aggregation and they add an extra M/R job for the final > aggregation. (cf https://issues.apache.org/jira/browse/HIVE-964)
> Is that possible in Cascading 2.0? For heavily skewed data it's totally > worth having an extra job.
> On Sat, Apr 14, 2012 at 10:09 PM, Ben Linsay <blin...@gmail.com> wrote:
>> yeah, i think that's what i'll end up doing. i'm going to end up >> serializing my super-nested tuple bags to JSON, so it felt better to have >> smaller operations and compose them instead of trying to group and encode >> in one shot.
>> thanks for pointing me in the right direction.
>> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote:
>>> why not group on the id instead of stuffing all the values in a Tuple, >>> then use a Buffer to work against them.
>>> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>>> unfortunately I have to concatenate a few fields of each tuple with a >>> particular ID to generate "bags" of data, so I'm definitely using memory in >>> my aggregators. no need to blame chris. :P
>>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote:
>>>> From what I know of the 1.2 code, you can't run out of memory in a >>>> GroupBy/CoGroup just because there are lots of tuples in a group.
>>>> This assumes that you're not doing anything in a custom Buffer or >>>> Aggregator that consumes memory, of course.
>>>> So maybe it's a 2.0 issue?
>>>> -- Ken
>>>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>>> hi all,
>>>> what's the standard approach for dealing with massively skewed data >>>> sets in cascading? I'm currently dealing with a situation where the same >>>> reduce task is throwing an OutOfMemory error during tuple >>>> deserialization (stack trace is below) , which I can only imagine is caused >>>> by an extremely large group of tuples with the same key. i have a trap on >>>> this branch of my stream, but this seems not to trigger it.
>>>> i'm in a situation where I can throw this group away as an outlier, so >>>> i'm going to try using a FirstN aggregator to limit the size of each reduce >>>> group and throw away tuples in excess of what's "reasonable". This seems >>>> reasonable, but also rather kludgy. I'd love a more elegant solution.
>>>> relevant notes: this may not be a software problem, and I just need >>>> more hardware. the data-set is is about 1T, and I'm running on an EMR >>>> cluster of m1.xlarge boxes configured using amazon's memory-intensive >>>> bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
>>>> --- stack trace ---
>>>> cascading.flow.FlowException: internal error during reducer execution >>>> at cascading.flow.hadoop.**FlowReducer.reduce(** >>>> FlowReducer.java:122) >>>> at org.apache.hadoop.mapred.**ReduceTask.runOldReducer(** >>>> ReduceTask.java:527) >>>> ... >>>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>> at java.lang.Long.valueOf(Long.**java:557) >>>> at cascading.tuple.hadoop.**HadoopTupleInputStream.**readType(* >>>> *HadoopTupleInputStream.java:**93) >>>> at cascading.tuple.hadoop.**HadoopTupleInputStream.** >>>> getNextElement(**HadoopTupleInputStream.java:**52) >>>> at cascading.tuple.hadoop.**TupleElementComparator.**compare(** >>>> TupleElementComparator.java:**74) >>>> at cascading.tuple.hadoop.**TupleElementComparator.**compare(** >>>> TupleElementComparator.java:**32) >>>> at cascading.tuple.hadoop.**DelegatingTupleElementComparat** >>>> or.compare(**DelegatingTupleElementComparat**or.java:73) >>>> at cascading.tuple.hadoop.**DelegatingTupleElementComparat** >>>> or.compare(**DelegatingTupleElementComparat**or.java:33) >>>> at cascading.tuple.hadoop.**DeserializerComparator.** >>>> compareTuples(**DeserializerComparator.java:**145) >>>> at cascading.tuple.hadoop.**GroupingSortingComparator.** >>>> compare(**GroupingSortingComparator.**java:59) >>>> at org.apache.hadoop.mapred.**Merger$MergeQueue.lessThan(** >>>> Merger.java:373) >>>> at org.apache.hadoop.util.**PriorityQueue.downHeap(** >>>> PriorityQueue.java:136) >>>> at org.apache.hadoop.util.**PriorityQueue.adjustTop(** >>>> PriorityQueue.java:103) >>>> ... (many more) ...
>>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "cascading-user" group. >>>> To view this discussion on the web visit https://groups.google.com/d/** >>>> msg/cascading-user/-/**WCZygpwcvhIJ<https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ> >>>> . >>>> To post to this group, send email to cascading-user@googlegroups.**com<cascading-user@googlegroups.com> >>>> . >>>> To unsubscribe from this group, send email to >>>> cascading-user+unsubscribe@**googlegroups.com<cascading-user+unsubscribe@go oglegroups.com> >>>> . >>>> For more options, visit this group at http://groups.google.com/** >>>> group/cascading-user?hl=en<http://groups.google.com/group/cascading-user?hl=en> >>>> .
>>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com >>>> custom big data solutions & training >>>> Hadoop, Cascading, Mahout & Solr
>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "cascading-user" group. >>> To view this discussion on the web visit https://groups.google.com/d/** >>> msg/cascading-user/-/**fL9yHv5Z1WQJ<https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ> >>> . >>> To post to this group, send email to cascading-user@googlegroups.**com<cascading-user@googlegroups.com> >>> . >>> To unsubscribe from this group, send email to >>> cascading-user+unsubscribe@**googlegroups.com<cascading-user+unsubscribe@go oglegroups.com> >>> . >>> For more options, visit this group at http://groups.google.com/** >>> group/cascading-user?hl=en<http://groups.google.com/group/cascading-user?hl=en> >>> .
>> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to >> cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at >> http://groups.google.com/group/cascading-user?hl=en.
> -- > You received this message because you are subscribed to the Google Groups > "cascading-user" group. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to > cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/cascading-user?hl=en.
> -- > You received this message because you are subscribed to the Google Groups > "cascading-user" group. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to > cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at > http://groups.google.com/group/cascading-user?hl=en.
I just wanted to let you know that you're not alone! I've seen similar heap errors using extremely skewed datasets and Cascalog, joining a set of 50 tuples with a set of 100s of thousands or a few million tuples.
In Cascalog our solution was to create a Clojure map in a separate, very quick map-side job. After that it could be passed into the custom function, where keyword lookups produced the same result as a join. It's much, much faster and stabler. I've never used Cascading directly, but since it works in Cascalog I assume there's an analogue in pure Cascading.
Here's what you'd do in Cascalog (skipping the initial map job that would otherwise create "other-vals").
(def src "Our input data" [[1 20] [2 30]])
(defn join-it "Join actually happens here with keyword lookup" [id other-vals] (other-vals (keyword (str id))))
On Saturday, April 14, 2012 1:09:36 PM UTC-7, Ben Linsay wrote:
> yeah, i think that's what i'll end up doing. i'm going to end up > serializing my super-nested tuple bags to JSON, so it felt better to have > smaller operations and compose them instead of trying to group and encode > in one shot.
> thanks for pointing me in the right direction.
> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote:
>> why not group on the id instead of stuffing all the values in a Tuple, >> then use a Buffer to work against them.
>> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>> unfortunately I have to concatenate a few fields of each tuple with a >> particular ID to generate "bags" of data, so I'm definitely using memory in >> my aggregators. no need to blame chris. :P
>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote:
>>> From what I know of the 1.2 code, you can't run out of memory in a >>> GroupBy/CoGroup just because there are lots of tuples in a group.
>>> This assumes that you're not doing anything in a custom Buffer or >>> Aggregator that consumes memory, of course.
>>> So maybe it's a 2.0 issue?
>>> -- Ken
>>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>> hi all,
>>> what's the standard approach for dealing with massively skewed data sets >>> in cascading? I'm currently dealing with a situation where the same reduce >>> task is throwing an OutOfMemory error during tuple deserialization (stack >>> trace is below) , which I can only imagine is caused by an extremely large >>> group of tuples with the same key. i have a trap on this branch of my >>> stream, but this seems not to trigger it.
>>> i'm in a situation where I can throw this group away as an outlier, so >>> i'm going to try using a FirstN aggregator to limit the size of each reduce >>> group and throw away tuples in excess of what's "reasonable". This seems >>> reasonable, but also rather kludgy. I'd love a more elegant solution.
>>> relevant notes: this may not be a software problem, and I just need more >>> hardware. the data-set is is about 1T, and I'm running on an EMR cluster of >>> m1.xlarge boxes configured using amazon's memory-intensive bootstrap >>> action. i'm considering upgrading to cc1.4xlarge boxes.
>>> --- stack trace ---
>>> cascading.flow.FlowException: internal error during reducer execution >>> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >>> at >>> org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >>> ... >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at java.lang.Long.valueOf(Long.java:557) >>> at >>> cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >>> at >>> cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >>> at >>> cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >>> at >>> cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >>> at >>> cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >>> at >>> cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >>> at >>> cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >>> at >>> cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >>> at >>> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >>> at >>> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >>> at >>> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >>> ... (many more) ...
>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "cascading-user" group. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >>> To post to this group, send email to cascading-user@googlegroups.com. >>> To unsubscribe from this group, send email to >>> cascading-user+unsubscribe@googlegroups.com. >>> For more options, visit this group at >>> http://groups.google.com/group/cascading-user?hl=en.
>>> -------------------------- >>> Ken Krugler >>> http://www.scaleunlimited.com >>> custom big data solutions & training >>> Hadoop, Cascading, Mahout & Solr
>> -- >> You received this message because you are subscribed to the Google Groups >> "cascading-user" group. >> To view this discussion on the web visit >> https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to >> cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at >> http://groups.google.com/group/cascading-user?hl=en.
you should't be seeing OOME when doing a CoGroup unless you have "cascading.spill.threshold" set to an unreasonably large number (or you are stuff nearly unreasonable amounts of data into a single Tuple), and you had ordered your tuple streams incorrectly in the CoGroup (large to small)
also, Join (likely renamed to HashJoin) is an effective map side join, and in the case of an inner-join, an effective filter, which would be the best practice if one side fits in memory (assuming you order your tuple streams from large to small, as in CoGroup) and you won't be performing an Aggregation afterwards.
> I just wanted to let you know that you're not alone! I've seen similar heap errors using extremely skewed datasets and Cascalog, joining a set of 50 tuples with a set of 100s of thousands or a few million tuples.
> In Cascalog our solution was to create a Clojure map in a separate, very quick map-side job. After that it could be passed into the custom function, where keyword lookups produced the same result as a join. It's much, much faster and stabler. I've never used Cascading directly, but since it works in Cascalog I assume there's an analogue in pure Cascading.
> Here's what you'd do in Cascalog (skipping the initial map job that would otherwise create "other-vals").
> On Saturday, April 14, 2012 1:09:36 PM UTC-7, Ben Linsay wrote: > yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.
> thanks for pointing me in the right direction.
> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote: > why not group on the id instead of stuffing all the values in a Tuple, then use a Buffer to work against them.
> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>> unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote: >> From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a group.
>> This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.
>> So maybe it's a 2.0 issue?
>> -- Ken
>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>> hi all,
>>> what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
>>> i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
>>> relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
>>> --- stack trace ---
>>> cascading.flow.FlowException: internal error during reducer execution >>> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >>> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >>> ... >>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at java.lang.Long.valueOf(Long.java:557) >>> at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >>> at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >>> at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >>> at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >>> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >>> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >>> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >>> ... (many more) ...
>>> -- >>> You received this message because you are subscribed to the Google Groups "cascading-user" group. >>> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >>> To post to this group, send email to cascading-user@googlegroups.com. >>> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >>> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> custom big data solutions & training >> Hadoop, Cascading, Mahout & Solr
>> -- >> You received this message because you are subscribed to the Google Groups "cascading-user" group. >> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> -- > You received this message because you are subscribed to the Google Groups "cascading-user" group. > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t1OKvoEF_W4J. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> you should't be seeing OOME when doing a CoGroup unless you have "cascading.spill.threshold" set to an unreasonably large number (or you are stuff nearly unreasonable amounts of data into a single Tuple), and you had ordered your tuple streams incorrectly in the CoGroup (large to small)
> also, Join (likely renamed to HashJoin) is an effective map side join, and in the case of an inner-join, an effective filter, which would be the best practice if one side fits in memory (assuming you order your tuple streams from large to small, as in CoGroup) and you won't be performing an Aggregation afterwards.
> ckw
> On Apr 16, 2012, at 2:10 PM, Robin Kraft wrote:
>> Hey Ben,
>> I just wanted to let you know that you're not alone! I've seen similar heap errors using extremely skewed datasets and Cascalog, joining a set of 50 tuples with a set of 100s of thousands or a few million tuples.
>> In Cascalog our solution was to create a Clojure map in a separate, very quick map-side job. After that it could be passed into the custom function, where keyword lookups produced the same result as a join. It's much, much faster and stabler. I've never used Cascading directly, but since it works in Cascalog I assume there's an analogue in pure Cascading.
>> Here's what you'd do in Cascalog (skipping the initial map job that would otherwise create "other-vals").
>> On Saturday, April 14, 2012 1:09:36 PM UTC-7, Ben Linsay wrote: >> yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.
>> thanks for pointing me in the right direction.
>> On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote: >> why not group on the id instead of stuffing all the values in a Tuple, then use a Buffer to work against them.
>> On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote:
>>> unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
>>> On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote: >>> From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a group.
>>> This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.
>>> So maybe it's a 2.0 issue?
>>> -- Ken
>>> On Apr 13, 2012, at 11:13pm, Ben Linsay wrote:
>>>> hi all,
>>>> what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
>>>> i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
>>>> relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
>>>> --- stack trace ---
>>>> cascading.flow.FlowException: internal error during reducer execution >>>> at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:122) >>>> at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:527) >>>> ... >>>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >>>> at java.lang.Long.valueOf(Long.java:557) >>>> at cascading.tuple.hadoop.HadoopTupleInputStream.readType(HadoopTupleInputStre am.java:93) >>>> at cascading.tuple.hadoop.HadoopTupleInputStream.getNextElement(HadoopTupleInp utStream.java:52) >>>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:74) >>>> at cascading.tuple.hadoop.TupleElementComparator.compare(TupleElementComparato r.java:32) >>>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:73) >>>> at cascading.tuple.hadoop.DelegatingTupleElementComparator.compare(DelegatingT upleElementComparator.java:33) >>>> at cascading.tuple.hadoop.DeserializerComparator.compareTuples(DeserializerCom parator.java:145) >>>> at cascading.tuple.hadoop.GroupingSortingComparator.compare(GroupingSortingCom parator.java:59) >>>> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) >>>> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:136) >>>> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) >>>> ... (many more) ...
>>>> -- >>>> You received this message because you are subscribed to the Google Groups "cascading-user" group. >>>> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. >>>> To post to this group, send email to cascading-user@googlegroups.com. >>>> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >>>> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>>> -------------------------- >>> Ken Krugler >>> http://www.scaleunlimited.com >>> custom big data solutions & training >>> Hadoop, Cascading, Mahout & Solr
>>> -- >>> You received this message because you are subscribed to the Google Groups "cascading-user" group. >>> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. >>> To post to this group, send email to cascading-user@googlegroups.com. >>> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >>> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>> -- >> You received this message because you are subscribed to the Google Groups "cascading-user" group. >> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t1OKvoEF_W4J. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> -- > You received this message because you are subscribed to the Google Groups "cascading-user" group. > To post to this group, send email to cascading-user@googlegroups.com. > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com. > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
Right now, the replication is constant across all keys, but a smarter approach is to only replicate the keys with a lot of values (we have this on our roadmap: do a sampled cogroup + count, compute replication from count, then Join the replication to the non-sampled streams and apply the block join algorithm).
On Monday, April 16, 2012 at 2:34 PM, Chris K Wensel wrote: > to be clearer, CoGroup and Join work best when you order your tuple streams from large to small...
> when in the case of CoGroup, large and small mean num values per unique key.
> and in the case of Join, large and small means doesn't and does fit in memory, respectively
> ckw > On Apr 16, 2012, at 2:31 PM, Chris K Wensel wrote: > > you should't be seeing OOME when doing a CoGroup unless you have "cascading.spill.threshold" set to an unreasonably large number (or you are stuff nearly unreasonable amounts of data into a single Tuple), and you had ordered your tuple streams incorrectly in the CoGroup (large to small)
> > also, Join (likely renamed to HashJoin) is an effective map side join, and in the case of an inner-join, an effective filter, which would be the best practice if one side fits in memory (assuming you order your tuple streams from large to small, as in CoGroup) and you won't be performing an Aggregation afterwards.
> > ckw > > On Apr 16, 2012, at 2:10 PM, Robin Kraft wrote: > > > Hey Ben,
> > > I just wanted to let you know that you're not alone! I've seen similar heap errors using extremely skewed datasets and Cascalog, joining a set of 50 tuples with a set of 100s of thousands or a few million tuples.
> > > In Cascalog our solution was to create a Clojure map in a separate, very quick map-side job. After that it could be passed into the custom function, where keyword lookups produced the same result as a join. It's much, much faster and stabler. I've never used Cascading directly, but since it works in Cascalog I assume there's an analogue in pure Cascading.
> > > Here's what you'd do in Cascalog (skipping the initial map job that would otherwise create "other-vals").
> > > On Saturday, April 14, 2012 1:09:36 PM UTC-7, Ben Linsay wrote: > > > > yeah, i think that's what i'll end up doing. i'm going to end up serializing my super-nested tuple bags to JSON, so it felt better to have smaller operations and compose them instead of trying to group and encode in one shot.
> > > > thanks for pointing me in the right direction.
> > > > On Saturday, April 14, 2012 10:40:46 AM UTC-7, Chris K Wensel wrote: > > > > > why not group on the id instead of stuffing all the values in a Tuple, then use a Buffer to work against them.
> > > > > On Apr 14, 2012, at 10:24 AM, Ben Linsay wrote: > > > > > > unfortunately I have to concatenate a few fields of each tuple with a particular ID to generate "bags" of data, so I'm definitely using memory in my aggregators. no need to blame chris. :P
> > > > > > On Saturday, April 14, 2012 8:05:36 AM UTC-7, kkrugler wrote: > > > > > > > From what I know of the 1.2 code, you can't run out of memory in a GroupBy/CoGroup just because there are lots of tuples in a group.
> > > > > > > This assumes that you're not doing anything in a custom Buffer or Aggregator that consumes memory, of course.
> > > > > > > So maybe it's a 2.0 issue?
> > > > > > > -- Ken
> > > > > > > On Apr 13, 2012, at 11:13pm, Ben Linsay wrote: > > > > > > > > hi all,
> > > > > > > > what's the standard approach for dealing with massively skewed data sets in cascading? I'm currently dealing with a situation where the same reduce task is throwing an OutOfMemory error during tuple deserialization (stack trace is below) , which I can only imagine is caused by an extremely large group of tuples with the same key. i have a trap on this branch of my stream, but this seems not to trigger it.
> > > > > > > > i'm in a situation where I can throw this group away as an outlier, so i'm going to try using a FirstN aggregator to limit the size of each reduce group and throw away tuples in excess of what's "reasonable". This seems reasonable, but also rather kludgy. I'd love a more elegant solution.
> > > > > > > > relevant notes: this may not be a software problem, and I just need more hardware. the data-set is is about 1T, and I'm running on an EMR cluster of m1.xlarge boxes configured using amazon's memory-intensive bootstrap action. i'm considering upgrading to cc1.4xlarge boxes.
> > > > > > > > -- > > > > > > > > You received this message because you are subscribed to the Google Groups "cascading-user" group. > > > > > > > > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/WCZygpwcvhIJ. > > > > > > > > To post to this group, send email to cascading-user@googlegroups.com (mailto:cascading-user@googlegroups.com). > > > > > > > > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com (mailto:cascading-user+unsubscribe@googlegroups.com). > > > > > > > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> > > > > > -- > > > > > > You received this message because you are subscribed to the Google Groups "cascading-user" group. > > > > > > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/fL9yHv5Z1WQJ. > > > > > > To post to this group, send email to cascading-user@googlegroups.com (mailto:cascading-user@googlegroups.com). > > > > > > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com (mailto:cascading-user+unsubscribe@googlegroups.com). > > > > > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> > > -- > > > You received this message because you are subscribed to the Google Groups "cascading-user" group. > > > To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t1OKvoEF_W4J. > > > To post to this group, send email to cascading-user@googlegroups.com (mailto:cascading-user@googlegroups.com). > > > To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com (mailto:cascading-user+unsubscribe@googlegroups.com). > > > For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
> > -- > > You received this message because you are subscribed to the Google Groups "cascading-user" group. > > To post to this group, send email to cascading-user@googlegroups.com (mailto:cascading-user@googlegroups.com). > > To unsubscribe from this