Converting an array/seq to Spark.RDD

16,623 views
Skip to first unread message

Jae Lee

unread,
Nov 12, 2013, 5:03:30 PM11/12/13
to spark...@googlegroups.com
Hello, I recently started using Spark to do some project at my internship. 


The dataset consists of Wave ID, Wavelength, and Intensity collected from all the waves emitted and observed from 2499 known objects. There are 2499 different Wave ID’s, each with 1000 wavelengths and intensities. The task is to find the χ2 value between an observed data and reference data, in order to identify the type of object the observed object belongs to by finding the object with the minimum χ2 value from the reference data. 

So basically I get the known data into the form Array(ID, Seq[(wavelength, intensity)]) after using sequence of map and groupByKey actions. But when I try to use any Spark actions on Seq[(wavelength, intensity)] with the observed data (which is a Spark.RDD), it doesn't work because the types are not matching, saying that the Spark mapreduce actions only work on Spark.RDD. So I was wondering if there is any way to convert something to Spark.RDD so I can use the Spark action on it.

Sorry if it's too vague. Please let me know if there are parts that you want me to clarify.

Josh Rosen

unread,
Nov 12, 2013, 5:05:44 PM11/12/13
to spark...@googlegroups.com


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Jae Ho "Michael" Lee

unread,
Nov 12, 2013, 5:27:00 PM11/12/13
to spark...@googlegroups.com
So let's say that the data in the form Array(ID, Seq[(wavelength, intensity)]) is called "data" (more correctly, it says
res46: spark.RDD[(Int, Seq[(Int, Double)])] = MapPartitionsRDD[15] at groupByKey at <console>:12)

so in order to get like spark.RDD[(Int, spark.RDD[(Int, Double)])], would I do 
val newData = data.map(l => (l._1, sc.parallelize(l._2)))?

it does return
res50: spark.RDD[(Int, spark.RDD[(Int, Double)])] = MappedRDD[45] at map at <console>:15

but when I try to print it, it gives

ERROR LocalScheduler: Exception in task 0 java.lang.NullPointerException (followed by long lines of error lines)

Jae Ho Lee
University of California Berkeley | Class of 2013
B.A. in Applied Mathematics


--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/G6qaq4mD0AA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

Josh Rosen

unread,
Nov 12, 2013, 5:37:37 PM11/12/13
to spark...@googlegroups.com
This doesn't work because Spark doesn't support nested RDDs:

Are you trying to do something like "for each ID, join the observed data with the reference data, then compute some function over it"?

Jae Ho "Michael" Lee

unread,
Nov 12, 2013, 5:39:03 PM11/12/13
to spark...@googlegroups.com
Ah I see. And yes. That's basically what I am trying to do.

Jae Ho Lee
University of California Berkeley | Class of 2013
B.A. in Applied Mathematics


Josh Rosen

unread,
Nov 12, 2013, 5:49:18 PM11/12/13
to spark...@googlegroups.com
I'd place both datasets into the form RDD[(Int, Seq[(Int, Double)])], then use join, e.g.

val expected: RDD[(Int, Seq[(Int, Double)])] = sc.parallelize()...
val actual: RDD[(Int, Seq[(Int, Double)])] = sc.textFile().map(...) /* or however you load your data */
val joined = expected.join(actual)
val differences = joined.mapValues{ case(expected, actual) => expected - actual /* or whatever operation is appropriate */}

so `differences` is an RDD[(ID/Int, some measure of distance)].

(Aside: if 'actual' is large and 'expected' is small (which it probably is if you're constructing it in the driver using parallelize()), then your job could benefit from a broadcast-join/map-side join where you use sc.broadcast() to copy the expected data array to all workers, then perform the join manually inside of a map or mapPartitions call; you can search for "broadcast join" on this mailing list to find some examples of this).

Jae Ho "Michael" Lee

unread,
Nov 12, 2013, 5:56:22 PM11/12/13
to spark...@googlegroups.com
So the reason I couldn't just use Seq is that when I calculate that 'differences' is because I need to use some of __ByKey() actions from Spark in that step. So after I join or combine the expected/reference (2499 items with 1000 values) and the actual (1000 values), I need to use like the Spark action but these actions don't work on Seq right?

But in the meantime, I will try that method and also look into "broadcast join". Thank you.

Jae Ho Lee
University of California Berkeley | Class of 2013
B.A. in Applied Mathematics


Jae Ho "Michael" Lee

unread,
Nov 25, 2013, 12:39:11 PM11/25/13
to spark...@googlegroups.com
Hi, so is it even possible to use the actions as you do broadcast join?
I am still not sure if broadcast join will solve the problem because the computation I need to do involves formulas in the attached picture, where O_i and M_i come from different data.


Jae Ho Lee
University of California Berkeley | Class of 2013
B.A. in Applied Mathematics


Screen Shot 2013-11-25 at 9.38.00 AM.png

Jae Ho "Michael" Lee

unread,
Nov 27, 2013, 2:11:42 PM11/27/13
to spark...@googlegroups.com
#object
val o1 = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/O1.txt").map(line => line.split(","))
val waveIntensityPairsO1 = o1.map(line => (line(0).toInt,line(1).toDouble))
val waveErrorPairsO1 = o1.map(line => (line(0).toInt,1/line(2).toDouble))

#metadata
val m1 = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/M1.txt").map(line => line.split(","))
val waveIntensityPairsM1 = m1.map(line => (line(0).toInt,line(1).toDouble))

#finding alpha
val numerator1 = waveIntensityPairsM1.union(waveIntensityPairsO1).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)
val denominator1 = m_1.map(line => (line(0).toInt,line(1).toDouble*line(1).toDouble)).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)
val alpha1 = numerator1/denominator1

#finding chi^2
val num1Temp = m_1.map(line => (line(0).toInt, alpha1*line(1).toDouble))
val num1Temp2 = num1Temp.union(waveIntensityPairsO1).reduceByKey(_ - _)
val chiSquared1 = num1Temp2.union(num1Temp2).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)




#NOW WITH MULTIPLE METADATA

val data = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/flux_ins.1000.csv").map(l => l.split(",")).map(l => (l(0).toInt, (l(1).toInt, l(2).toDouble))).groupByKey

data consists of pairs of the form (ID, Seq[(wavelength, intensity)])
val sub_data = data.map(l => l._2)

val sub_data = data.map(l=> l._2).map(l=>l.toArray.map(l=> (l._1,l._2))) 

data.map(l=> (l._1, l._2.map(l=> sc.parallelize(l)))) 

Jae Ho Lee
University of California Berkeley | Class of 2013
B.A. in Applied Mathematics


Reply all
Reply to author
Forward
0 new messages