merge multiple RDDs

15,601 views
Skip to first unread message

CodingCat

unread,
Nov 28, 2012, 12:44:29 AM11/28/12
to spark...@googlegroups.com
Hi, everyone

in my program, I have three separated threads producing RDDs

and the consumer wants to read these 3 RDDs and merge them together to generate results, 

is there anyway to take 3 RDDs as input and output a new RDD?

Best,

Matei Zaharia

unread,
Nov 28, 2012, 12:44:58 AM11/28/12
to spark...@googlegroups.com
Yes, you can use SparkContext.union(rdd1, rdd2, rdd3).

Matei

Patrick Wendell

unread,
Nov 28, 2012, 12:46:54 AM11/28/12
to spark...@googlegroups.com
There is a union operator which does exactly this... it actually does this "in place", so there is no overhead (assuming the RDD's are the same type).

scala> val rdd1 = sc.parallelize(Array(1,2,3))
rdd1: spark.RDD[Int] = spark.ParallelCollection@46b94c58

scala> val rdd2 = sc.parallelize(Array(4,5,6))
rdd2: spark.RDD[Int] = spark.ParallelCollection@2db8d536

scala> rdd1.union(rdd2).collect

res1: Array[Int] = Array(1, 2, 3, 4, 5, 6)

Nan Zhu

unread,
Nov 28, 2012, 1:10:45 AM11/28/12
to spark...@googlegroups.com
Thank you Patrick and Matei, 

if I want to merge them in another way, say RDD1 and RDD2 both contains float numbers, they are also with the same number of element, 

if I want add RDD1 and RDD2 as 

1stInRDD1 + 1stInRDD2 = 1stInNewRDD
2ndInRDD1 + 2ndInRDD2 = 2ndInNewRDD
3rdInRDD1 + 3rdInRDD2 = 3rdInNewRDD

What shall I do?

I can cast a RDD  to array and achieve this, but if the RDD is very large, it's obviously inefficiency, any other smarter way?

Best,

-- 
Nan Zhu
School of Computer Science,
McGill University

Matei Zaharia

unread,
Nov 28, 2012, 1:29:03 AM11/28/12
to spark...@googlegroups.com
Ah, so this isn't possible with the current release of Spark, because in general you can't know whether the two RDDs' corresponding partitions will be exactly identical in size (e.g. if you created one of the RDDs with a filter, you don't know how many elements it has, so it's hard to match elements in it with the other one without going through the whole thing linearly). However, since a lot of people have asked for this, I've implemented a version in the master branch on Git, which works if the RDDs are the same size. It will work if you create them both with parallelize(), or if you create one by mapping the other. Here's the commit for it: https://github.com/mesos/spark/commit/27e43abd192440de5b10a5cc022fd5705362b276 (but just check out the master branch).

Matei

Nan Zhu

unread,
Nov 28, 2012, 1:41:35 AM11/28/12
to spark...@googlegroups.com
good, I think this zippedRDD should work fine for my case, since I'm pretty sure about the number of instances in my RDDs

Best,

-- 
Nan Zhu
School of Computer Science,
McGill University

Reply all
Reply to author
Forward
0 new messages