mappers vs combines

82 views
Skip to first unread message

Alex Cozzi

unread,
Sep 12, 2013, 12:15:17 AM9/12/13
to scoob...@googlegroups.com
In several of our jobs I noticed a common pattern: whenever I use the reduce function I get really bad performance: the job runs very slowly or even fails to terminate (the mapper tasks get killed because they fail to report progress for more than 600 seconds), while a simple rewriting of the same using a map function make it run very quickly. Here is an example of what I am talking about:

This run very slowly:

val records = ... // a  DList of (key, ints) pairs

val aggregate = records.groupByKey                                        
 .combine(Reduction.Sum.int)

persist(toDelimitedTextFile(aggregate...


=========================
This one is fast:

val records = ... // a  DList of (key, ints) pairs

val aggregate = records.groupByKey                                        
 .map { case(key, seq) => (key, seq.reduce(Reduction.Sum.int.apply) }

persist(toDelimitedTextFile(aggregate...

===========================

In my case the keys in the records will create a lot of relatively small groups. I am wondering whether this is the expected behavior, or whether there is some issue with combiners in hadoop 1.0. I'd probably like to have a scoobi option to disable the use of combiners, which I think should convert the first program to the second one without a code change.

Any thoughts?
Thanks
Alex

Eric Torreborre

unread,
Sep 12, 2013, 9:49:35 PM9/12/13
to scoob...@googlegroups.com
Hi Alex,

I suppose that what's happening is that, in each mapper, there is not much to aggregate so running a Combiner only adds overhead. Can you confirm that by looking at the statistics and especially the Combiner input/output # of records?

The transformation of a combine operation to a map operation after a groupByKey is already being done by Scoobi, so what we need to add is a configuration option to disable the use of combiners in the mappers.

I have added such an option `ScoobiConfiguration.disableCombiners` in the latest 0.8.0-SNAPSHOT (pending publishing as I write this).

Please try it out and tell me if that works for you.

Thanks,

Eric.

Alex Cozzi

unread,
Sep 12, 2013, 11:16:22 PM9/12/13
to scoob...@googlegroups.com
Thank you Eric,
you guys are, as usual, awfully responsive!
Really appreciated. I will try it out and report back.
Alex

Eric Springer

unread,
Sep 17, 2013, 6:10:17 PM9/17/13
to scoob...@googlegroups.com
I've run into the same thing. My conclusion of the situation was that when you
use a combiner with hadoop it adds an extra: serialization, sorting,
deserialization to the mapper

If the combiner isn't able to reduce the data a significant amount,
this is pure overhead. Some code (like scoobi's join) stuff is
organized so that each mapper can has a huge amount of reducable data,
(IIRC x100 isn't unusual) so its a huge benefit (and without the
combiner, isn't actually even feasible to run).

Maybe a simple (and ugly) combineWithoutCombiner method could be
added, that is just simply a wrapper around map [Since I kind of feel
a configuration option is too course for anything other than
performance testing]
> --
> You received this message because you are subscribed to the Google Groups
> "scoobi-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to scoobi-dev+...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

Eric Torreborre

unread,
Sep 17, 2013, 7:15:33 PM9/17/13
to scoob...@googlegroups.com
We can add "reduceValues", which can only be invoked on a DList[(K, Iterable[V]])

def reduceValues(r: Reduction[V]): DList[(K, V)]

// in case the iterable is empty
def reduceValuesOption(r: Reduction[V]): DList[(K, Option[V])]

This is implemented using a map and consequently will not trigger the use of a Combiner.

If you want to give it a spin, it is available in 0.8.0-SNAPSHOT. If that works fine I'll remove the global configuration option which doesn't make much sense really.

E.

Alex Cozzi

unread,
Oct 25, 2013, 6:28:12 PM10/25/13
to scoob...@googlegroups.com
why I do not see it in the 0.8.0-cd3-SNAPSHOT ? 
My code looks like:

queries.groupByKey.reduceValues(reduction)


and the error is 

value reduceValues is not a member of com.nicta.scoobi.core.DList[((String, String, Int, String, Boolean, Boolean, String, String), Iterable[com.ebay.scoobi.examples.QStats])]

Eric Torreborre

unread,
Oct 27, 2013, 11:11:02 PM10/27/13
to scoob...@googlegroups.com
I don't know. 

When I look at the sources, or the binary jar file I can indeed see that this method is available on com.nicta.scoobi.core.DList. Are you sure you're looking at the right SNAPSHOT? Do you by any chance have an old snapshot in your ivy repo?

E.

Alex Cozzi

unread,
Oct 28, 2013, 2:02:02 PM10/28/13
to scoob...@googlegroups.com
Than you for checking! 
I figured out that sometimes using mvn -F and/or forcing within eclipse is not good enough for maven, but after doing 

rm -fr .m2/repository/com/nicta

I was finally able to see the new methods. 

What is confusing is that sometimes forcing works just fine, and sometime it does not. 

I apologize for the false alarm.

Alex

Reply all
Reply to author
Forward
0 new messages