rdd.reduce for sum

2,157 views
Skip to first unread message

Laeeq Ahmed

unread,
Jan 6, 2013, 5:11:43 PM1/6/13
to spark...@googlegroups.com
Hi all,

I am using java api for spark. The output of my map is as fallows. I want to add the 1's in reduce or just do total to find the number of positives from map.

adrenaline: 0
alprenolol: 0
clomethiazole: 0
coumarin: 0
dobutamine: 1
domperidone: 1
dopamine: 0
epanolol: 1
estradiol: 0
felodipine: 0
114: 0
Danthron: 0

I get the error of incompatible types in the following code. Can anybody corrects me.

JavaPairRDD<String,Integer> Winners = Molecules.reduce(new Function2<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple2<String,Integer>>() {
      public Tuple2<String,Integer> call (Tuple2<String ,Integer> t1,Tuple2<String , Integer> t2){
        return new Tuple2((t1._2>t2._2)?t1._1:t2._1, t1._2 + t2._2);
      }
    });

After using Tuple2, I know why scala is preferred rather than java for spark.

Regards,

Laeeq 

Josh Rosen

unread,
Jan 6, 2013, 5:42:11 PM1/6/13
to spark...@googlegroups.com
reduce() reduces all of the items in an RDD into a single value; it does not return an RDD, which is why you're seeing a type error.

If you want to find the total number of 1's in the whole dataset, I would extract the values then perform a sum.  This will be pipelined, so it should be efficient:

    JavaPairRDD<String, Integer> Molecules = null;
    JavaRDD<Integer> values = Molecules.map(new Function<Tuple2<String, Integer>, Integer>() {
      @Override
      public Integer call(Tuple2<String, Integer> obj) throws Exception {
        return obj._2();
      }
    });
    Integer total = values.reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });


A recent pull request added keys() and values() methods (and I'll add aliases for these in the Java API), so in the next release you should be able to write this as:

    JavaPairRDD<String, Integer> Molecules = null;
    Integer total = Molecules.values().reduce(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer a, Integer b) {
        return a + b;
      }
    });

- Josh

Laeeq Ahmed

unread,
Jan 7, 2013, 3:58:16 AM1/7/13
to spark...@googlegroups.com
Hi Josh,

Thanks for the detailed help.

Regards,

Laeeq
Reply all
Reply to author
Forward
0 new messages