Writing your own reducer function in spark

2,824 views
Skip to first unread message

praveenesh kumar

unread,
Oct 2, 2012, 7:49:34 AM10/2/12
to spark...@googlegroups.com
I want to write a reducer function in spark, that will iterate through all the all the values associated to the same key and does some operation on the list of those values. I know in hadoop the reducer functions gets (K, L(V) ) as parameter. I am a newbie in Spark. I have learned ReducebyKey also does almost same thing that I am trying to do.

What I am trying to do here is to  to write my own function to pass in  reducebykey( ) .

My question ,, what should be the signature of my function to be passed inside reducebykey(). How can I use/pass my function as a reducer function.

I want to make my own function X(Key, LIST(Values) ) that will do some processing inside.. How can I pass it inside ReduceByKey( ) method.

Can anyone give me a very simple example or a short code snippet on how to achieve this ?

Thanks,
Praveenesh


Nathan

unread,
Oct 2, 2012, 11:26:35 AM10/2/12
to spark...@googlegroups.com
I believe the function you pass in gets the values pair-wise, not as a list.

If you need a list, you could first map every element to a single element list, then concatenate the lists via reduce-by-key.. i.e.

data.map(p => (p._1, List(p._2))).reduceByKey(_ ++ _).map(p -> (p._1, your_function))

But I suspect it'd be a lot better if you could figure out a way for your function to work given pairwise input

praveenesh kumar

unread,
Oct 2, 2012, 11:43:01 AM10/2/12
to spark...@googlegroups.com
Okay I was comparing it with Hadoop reducer function.

So here is what I am doing..

1. Reading data from a file.. Applied my map function and got (K,V) pairs as an output.
2. Now I want to apply my own function as a reducer. But in reducer I want to process all (K,V) pairs with the same key. Thats what I am assuming ReduceByKey does for you.

My question is how can I write my reducer function to process all the (K,V) pair belonging to same key together.
What would be the signature of my reducer function in that case ?

How would I pass it inside reducebykey( ) method.

e.g. I passed my mapper like this ----  dataset.map(x=>Mymapper.map(x))
Here I knew x would be the line, so I defined my own map function to take string as argument.
Now after applying this map function, I got some (K,V) pairs.

How can I write my reduce function and pass it in reducebykey( ) function as I did in map case ?

Regards,
Praveenesh

Josh Rosen

unread,
Oct 2, 2012, 12:25:13 PM10/2/12
to spark...@googlegroups.com
If `data` is an RDD of key-value pairs of type (K, V), then data.groupByKey() produces an RDD with values of type (K, Seq[V]). For each key, this produces a sequence (list) of its values.

At this point, you can use map() to process each (key, list of values) pair.

If your map function transforms (K, Seq(V)) to (K, f(Seq(V)) (e.g. map(p -> (p._1, function(p._2)), which doesn't examine the key), then you can express this using mapValues(). mapValues() applies a function to the values of (key, value) pairs while preserving the RDD's partitioning; this can avoid unnecessary reshuffling by subsequent RDD transformations.

In summary,

data.map(p => (p._1, List(p._2))).reduceByKey(_ ++ _).map(p -> (p._1, your_function(p.2)))

can also be expressed as

data.groupByKey().mapValues(your_function).

The groupByKey() implementation is more efficient than concatenating lists using reduceByKey because it creates fewer intermediate objects.

- Josh
Reply all
Reply to author
Forward
0 new messages