A questions to calculate AVG……

2,631 views
Skip to first unread message

Kindaichi

unread,
Oct 22, 2013, 8:46:17 PM10/22/13
to spark...@googlegroups.com
Hi,every one;
 
Assume that I want to calculate average of one attribute in records with mapreduce,
that is:
 
rdd1=sc.newAPIHadoopFile(……)
rdd2=rdd1.map(each=>(each.key,each.value1+"\t"+each.value2))
rdd3=rdd2.reduceByKey(each=>Sum(each.value1.toInt).toString+"\t"+Sum(each.value2.toInt).toString)
rdd4=rdd3.map(each=>Sum(each.value1.toInt).toString/Sum(each.value2.toInt).toString))
 
rdd4.save(……)
 
I want to sum two attribute ,and second is the count of something, to caculate average.
 
And I think that the method above is not compact,could I do this in one map and one reduce?
 
Thanks!

Josh Rosen

unread,
Oct 22, 2013, 9:40:49 PM10/22/13
to spark...@googlegroups.com
From your psuedocode, it's a little unclear what you're trying to calculate.  Also, I'm a bit confused by all of the toString() and toInt()s in your code.

Assuming that you have (key, value: Numeric) pairs and you want to implement an "average value by key" operation, then you could do that with this one-liner:

scala> var data = sc.parallelize(Seq(("A", 2), ("A", 4), ("B", 2), ("Z", 0), ("B", 10)))
data: org.apache.spark.rdd.RDD[(java.lang.String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:12

scala> data.mapValues((_, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count}.collectAsMap()
res11: scala.collection.Map[java.lang.String,Double] = Map(Z -> 0.0, B -> 6.0, A -> 3.0)

If you had (key, (value, count)) pairs, then you'd just skip the first mapValues step:

scala> var data = sc.parallelize(Seq(("A", (0, 5)), ("A", (4, 2)), ("B", (25, 5)), ("Z", (10, 5)), ("B", (100, 10))))
data: org.apache.spark.rdd.RDD[(java.lang.String, (Int, Int))] = ParallelCollectionRDD[59] at parallelize at <console>:12

scala> data.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count}.collectAsMap()
res17: scala.collection.Map[java.lang.String,Double] = Map(Z -> 2.0, B -> 8.333333333333334, A -> 0.5714285714285714)

You could also express this with combineByKey, but I'll leave that as an exercise.


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply all
Reply to author
Forward
0 new messages