How to set flatMap to return full List of pairs on a List of Lists ?

900 views
Skip to first unread message

Adrian Kowalski

unread,
Jul 27, 2015, 1:07:12 PM7/27/15
to scala-user
Hi, I am trying to set the flatMap (I am not quite certain if I really need the flatMap) to store all the pairs in a list. Here is my code :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.collection.mutable.ListBuffer

object test {
 
def main(args: Array[String]) {
   
    val sc
= new SparkContext("local", "SparkTest")
   
   
var mlist = new ListBuffer[(Int,Double)]()
   
    mlist
.+= ((1, 5.0))
    mlist
.+= ((2, 2.0))
    mlist
.+= ((3, 3.0))
    mlist
.+= ((4, 6.0))
   
   
var list = sc.parallelize(mlist)
   
   
var temp = new ListBuffer[ListBuffer[(Int,Double)]]()
   
var out = sc.parallelize(temp)
   
   
for(i <- 1 to 2){
        list
=  list.map({case (x,y) => mfunc(x,y)})
                   
.reduceByKey(_+_)
       
       
out ++= list.flatMap( x => ListBuffer(ListBuffer(x)))
   
}
   
    println
("Output \n--")
   
out.foreach(println)
   
   
System.exit(0)
 
}
 
 
def mfunc(a: Int,b: Double):(Int, Double) = {
   
var y  = math.ceil(a/2.0).toInt
   
(y, b)
 
}
}



The output that I receive is the following :
ListBuffer((1,7.0))
ListBuffer((2,9.0))
ListBuffer((1,16.0))

but instead I would like to receive the output :
ListBuffer((1,7.0), (2,9.0))
ListBuffer((1,16.0))

or this form :
((1,7.0), (2,9.0))
((1,16.0))


Any idea how to achieve this ?

Vlad Patryshev

unread,
Jul 27, 2015, 2:47:02 PM7/27/15
to Adrian Kowalski, scala-user
Adrian,

Since you are probably looking for a detailed explanation on how this should be done, let's get back a little bit.

This piece of code does not "store all pairs". There's a function being called. You do map, you do reduce... you use vars.

Let's start with vars. Get rid of them.
Give each value its expected type.
See what fails.

Maybe for a while you do not need any mapping, to figure out what's going on.

Thanks,
-Vlad

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Adrian Kowalski

unread,
Jul 27, 2015, 3:27:17 PM7/27/15
to scala-user, vpatr...@gmail.com
Hello Vlad,

First of all thanks for the reply. Let me clarify things because in my initial post it is really not quite clear.

My goal is in every step of the for-loop to store all the pairs that are generated from the "list".

For that reason I use the "out" variable.

The code I provide produces this output :
ListBuffer((1,7.0))   //loop-iteration 1
ListBuffer((2,9.0))   //loop-iteration 1
ListBuffer((1,16.0))  //loop-iteration 2

while my goal is to have :
ListBuffer((1,7.0), (2,9.0))   //loop-iteration 1
ListBuffer((1,16.0))             //loop-iteration 2

What I cannot actually understand is how inside the flatMap we can store (or return) multiple pairs.

Thank you,
Adrian.

Kevin Wright

unread,
Jul 27, 2015, 4:10:00 PM7/27/15
to Vlad Patryshev, Adrian Kowalski, scala-user
Breaking it down with inline comments:


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object test {
  def main(args: Array[String]) {
    
    val sc = new SparkContext("local", "SparkTest")
    
    // use val, not var - this value doesn't need to change once initialised
    // similarly, ListBuffer is a mutable structure - and pointless given that we
    // parallelize it to an RDD anyway

    val input = Seq( //let scalac infer the type
      1 -> 5.0,
      2 -> 2.0,
      3 -> 3.0,
      4 -> 6.0    // a->b is shorthand for the tuple (a,b)
    )
    
    val inputRdd = sc.parallelize(input)  //val again
    
    // we map straight to the output, so no need to pre-initialise anything
    // or use intermediate collections

    val clamped1 = clamp(inputRdd)
    val clamped2 = clamp(clamped1)

    // use collect() to get the data (as an Array) back out of RDD form

    println("Output \n--")
    println(clamped1.collect().mkString)
    println(clamped2.collect().mkString)
    
    System.exit(0)
  }
  
  def printIt(xs: RDD[(Int,Int)]) = xs.collect().mkString

  // tupled is a very useful method, converting a function that takes 2 args into a function
  // that takes a tuple

  def clamp(xs: RDD[(Int,Int)]): RDD[(Int,Int)] =  xs.map(mfunc.tupled).reduceByKey(_+_)

  // always strive for the simplest code, and AVOID VARS!
  // also, it's faster to stick with integer arithmetic

  def mfunc(a: Int,b: Double):(Int, Double) = ((a/2) + (a%2)) -> b
}


I've not tested it, but any errors should be minor.


--
Kevin Wright
mail: kevin....@scalatechnology.com
gtalk / msn : kev.lee...@gmail.com
vibe / skype: kev.lee.wright
steam: kev_lee_wright

"My point today is that, if we wish to count lines of code, we should not regard them as "lines produced" but as "lines spent": the current conventional wisdom is so foolish as to book that count on the wrong side of the ledger" ~ Dijkstra

Adrian Kowalski

unread,
Jul 28, 2015, 5:46:00 AM7/28/15
to scala-user, vpatr...@gmail.com, kev.lee...@gmail.com
Thank you very much for the code you provided, the comments and I found very interesting the way you express the ceil function.
I think your code also helps me to explain better my problem.

1) The problem that I have is that instead of using clamped1 and clamped2 I would like to replace them and use a list (or array) with name clampedList (for example)
with size 2. The first element of the clampedList is the clampled1 and the second element is the clamped2 (eg

 val temp = ListBuffer[ListBuffer[(Int,Double)]]()
 val clampedList = sc.parallelize(temp)
 
 val champed1 = clamp(inputRdd) //how to replace the clamped1 with clampedList(0) ?
 val clamped2 = clamp(clamped1) //how to replace the clamped1 with clampedList(1) ?
)

and for example when we run the command clampedList.foreach(println) to receive something like :
(1,7.0)(2,9.0)
(1,16.0)

2) With the mfunc.tupled I receive the error : "missing arguments for method mfunc
object test; follow this method with '_'if you want to treat is as partially applied function"
I could not fix this error. But I could use :
def clamp(xs: RDD[(Int,Double)]):RDD[(Int,Double)] = xs.map({case (x,y) => mfunc(x,y)}).reduceByKey(_+_)

Thanks.

Adrian Kowalski

unread,
Jul 28, 2015, 7:35:45 AM7/28/15
to scala-user, vpatr...@gmail.com, kev.lee...@gmail.com
I have re-write the code like this :


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ListBuffer

object test {
  def main(args: Array[String]) {
   
    val sc = new SparkContext("local", "SparkTest")
   
    // use val, not var - this value doesn't need to change once initialised
    // similarly, ListBuffer is a mutable structure - and pointless given that we
    // parallelize it to an RDD anyway
   
    val input = Seq( //let scalac infer the type
      1 -> 5.0,
      2 -> 2.0,
      3 -> 3.0,
      4 -> 6.0    // a->b is shorthand for the tuple (a,b)
    )
   
    val inputRdd = sc.parallelize(input)  //val again
   
    // we map straight to the output, so no need to pre-initialise anything
    // or use intermediate collections
   
    //val temp:RDD[List[(Int,Double)]] = RDD[List[(Int,Double)]]()
   
    val temp = List[(Int,Double)]()
   
    var clampedList = sc.parallelize(temp)
   
    clampedList ++= clamp(inputRdd)
   
    clampedList ++= clamp(clampedList)

   
    // use collect() to get the data (as an Array) back out of RDD form
   
    println("Output \n--")
    //println(clamped1.collect().mkString)
    //println(clamped2.collect().mkString)
    clampedList.foreach(println)
   
    System.exit(0)
  }
 
  def printIt(xs: RDD[(Int,Double)]) = xs.collect().mkString

 
  // tupled is a very useful method, converting a function that takes 2 args into a function
  // that takes a tuple
 
  def clamp(xs: RDD[(Int,Double)]): RDD[(Int,Double)] =  xs.map({case (x,y) => mfunc(x,y)}).reduceByKey(_+_)
  //def clamp(xs: RDD[(Int,Double)]): RDD[(Int,Double)] =  xs.map(mfunc.tupled).reduceByKey(_+_)

 
 
  // always strive for the simplest code, and AVOID VARS!
  // also, it's faster to stick with integer arithmetic
 
  def mfunc(a: Int,b: Double):(Int, Double) = ((a/2) + (a%2)) -> b
}


If I understand correctly the clampedList in this case is : [(1,7.0),(2,9.0),(1,16.0)]. ( Btw,t how can I access only the first or only the second pair in the RDD ?)

Instead, I would like to be able to have a list or vector or array where when I type the first element of this array to give me (1,7.0),(2,9.0)
and the second (or last in this case) element the value (1,16.0).


Thank you.

Adrian Kowalski

unread,
Jul 28, 2015, 10:21:18 AM7/28/15
to scala-user, vpatr...@gmail.com, kev.lee...@gmail.com
Sorry again for this sequence of posts. I have update the code and I think now it will be more clear :


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ListBuffer

object test {
  def main(args: Array[String]) {
   
    val sc = new SparkContext("local", "SparkTest")
   
    val input = Seq( //let scalac infer the type
      1 -> 5.0,
      2 -> 2.0,
      3 -> 3.0,
      4 -> 6.0    // a->b is shorthand for the tuple (a,b)
    )
   
    val inputRdd = sc.parallelize(input)  //val again
   
    val init = List[List[(Int,Double)]]()
    var clampedList = sc.parallelize(init)
   
    val clamped1 = clamp(inputRdd)
   
    //This returns :
    //List((1,7.0))
    //List((2,9.0))
    //I would like to change it to return : List((1,7.0),(2,9.0)). How can I achieve this ?
    clampedList ++= clampList(clamped1)
   
    val clamped2 = clamp(clamped1)
   
    clampedList ++= clampList(clamped2)
   
    println("Output \n--")
    clampedList.foreach(println)
   
    System.exit(0)
  }
 
 
  def clampList(xs: RDD[(Int,Double)]): RDD[List[(Int,Double)]] =  xs.flatMap(x => List(List(x)))

 
  def clamp(xs: RDD[(Int,Double)]): RDD[(Int,Double)] =  xs.map({case (x,y) => mfunc(x,y)}).reduceByKey(_+_)
 
  def mfunc(a: Int,b: Double):(Int, Double) = ((a/2) + (a%2)) -> b
}


I created the clampList function. The output that I receive for the clampedList is :
--------
List((1,7.0))
List((2,9.0))
List((1,16.0))
--------

Instead, I would like to receive this output or the clampedList to store the values like this :
--------
List((1,7.0), (2,9.0))
List((1,16.0))
--------

or if the List is not required to be :
((1,7.0), (2,9.0))
((1,16.0))

How can we return from the clampList  function all the tuples once and not each tuple separately ?


Thanks.

On Monday, 27 July 2015 21:10:00 UTC+1, Kevin Wright wrote:
Reply all
Reply to author
Forward
0 new messages