divide single value with all the pairs

308 views
Skip to first unread message

lk

unread,
Aug 27, 2015, 1:42:42 PM8/27/15
to scala-user
Hi,

In the following code the mlistrdd:RDD[(Int, Double)]  is given (i.e. we do not know anything about it)

The goal is to extract the last pair and to divide all the values of this list with the extracted value.

I have written a code which does that by using take() (first code). The question is how can we do the same thing without using collect() or take(),

so that the whole execution to be in the RDD (second code) ? Any idea will be helpful. Thanks

First code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

object test {
 
def main(args: Array[String]) {
   
//settings
    val sc
= new SparkContext("local", "SparkTest")
   
   
//given list
   
var mlist = List[(Int,Double)]()
   
    mlist
++= List((4, 2.0))
    mlist
++= List((1, 4.0))
    mlist
++= List((3, 18.0))
    mlist
++= List((2, 30.0))
   
   
//list to rdd (imagine that list is given i.e. we do not know about it)
    val mlistrdd
:RDD[(Int, Double)] = sc.parallelize(mlist)
   
   
//extract value
    val v
:Array[Double] = mlistrdd.filter(x => x._1 == 4).map(x => x._2).take(1)
   
   
//divide all values with the extracted value
    val output1
:RDD[(Int, Double)] = mlistrdd.map(x => (x._1,x._2/v(0)))
   
   
//print value
    output1
.foreach(println)
   
   
System.exit(0)
 
}
}


Second code :
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

object test {
 
def main(args: Array[String]) {
   
//settings
    val sc
= new SparkContext("local", "SparkTest")
   
   
//given list
   
var mlist = List[(Int,Double)]()
   
    mlist
++= List((4, 2.0))
    mlist
++= List((1, 4.0))
    mlist
++= List((3, 18.0))
    mlist
++= List((2, 30.0))
   
   
//list to rdd (imagine that list is given i.e. we do not know about it)
    val mlistrdd
:RDD[(Int, Double)] = sc.parallelize(mlist)
   
   
//extract value
    val v
= mlistrdd.filter(x => x._1 == 4).map(x => x._2)
   
   
//How can we use the v:RDD[Double] and divide it with all the elements of the mlistrdd in the RDD (i.e. without using       //take() or colllect()) ???
    val output1
=
   
   
//print value
    output1
.foreach(println)
   
   
System.exit(0)
 
}
}




Lanny Ripple

unread,
Aug 28, 2015, 11:29:43 AM8/28/15
to scala-user
More of a Spark than Scala question but... 

(Guessing rather than knowing) I would try a comprehension.  You'll probably want to play with persisting either `mlistrdd` and/or `v` to check performance.

mlistrdd.flatMap {
  case (k, value) =>
    v.flatMap {
      vstar =>
        k -> (value / vstar)
Message has been deleted

lk

unread,
Aug 30, 2015, 6:44:50 PM8/30/15
to scala-user
Hi I tried this :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

import scala.collection.JavaConversions._

object test1 {

 
def main(args: Array[String]) {
   
//settings
    val sc
= new SparkContext("local", "SparkTest")
   
   
//given list
   
var mlist = List[(Int,Double)]()
   
    mlist
++= List((4, 2.0))
    mlist
++= List((1, 4.0))
    mlist
++= List((3, 18.0))
    mlist
++= List((2, 30.0))

   
   
//This list is given (i.e. we do not know about it)

    val mlistrdd
:RDD[(Int, Double)] = sc.parallelize(mlist)
   
   
//extract value

    val v
:RDD[Double] = mlistrdd.filter(x => x._1 == 4).map(x => x._2)
   
   
//divide the values of the mlistrdd with v without using take() or collect()
    val out1
= mlistrdd.map { case (k, value) => { v.map(vstar => (k,value/vstar))  } }
   
   
//print values
    out1
.collect().foreach(println)
   
   
System.exit(0)
 
}
}

But i receive this error :

RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

The out1 is RDD[RDD[(Int,Double)]]. Based on the error, nested RDD is not allowed.

Also in the same code if  I try :

val out1 = mlistrdd.flatMap { case (k, value) => { v.map(vstar => (k,value/vstar))  } }

but I receive the error :

type mismatch; found : org.apache.spark.rdd.RDD[(Int, Double)] required: TraversableOnce[?]

Any idea ?
Reply all
Reply to author
Forward
0 new messages