Nested RDDs

1,790 views
Skip to first unread message

andrew

unread,
Dec 5, 2012, 8:33:11 AM12/5/12
to spark...@googlegroups.com
Is it possible to have Nested RDDs?

I've tried this code:

sc.parallelize((0 to 10).toList)
.map(x => sc.parallelize((0 to (x + 1)).toList))
.collect

But I receive a java.lang.NullPointerException

Is it possible to do this?

Thanks

Matei Zaharia

unread,
Dec 5, 2012, 2:49:57 PM12/5/12
to spark...@googlegroups.com
No, unfortunately it isn't; would be somewhat complicated so support.

Matei

Rajiv Abraham

unread,
Mar 10, 2013, 9:57:48 PM3/10/13
to spark...@googlegroups.com
Hi,
I'd appreciate a few pointers on how to solve the following problem:

val badVerticeIdRDD = sc.parallelize(Seq("5","6"))

val vertex = aVertex(id=1,edgesTo("2","5","6")) // 

// below  is the problem I want to solve, I only want the edge to id=2 back
val newEdges = vertex.edges.filter(e => e.destination not in badVerticeIdRDD)

I get a java.lang.NullPointerException which I think is the same issue below.

Best Regards,
Rajiv

Mark Hamstra

unread,
Mar 10, 2013, 11:10:29 PM3/10/13
to spark...@googlegroups.com
If I'm reading your problem statement correctly, then this should do it:

val bothHave = badVerticeIdRDD.filter(vertex.edges.contains(_)).
aggregate(scala.collection.mutable.Set[String]())({case (accSet, rddElem) => accSet += rddElem}, _ ++ _)
val newEdges = vertex.edges.filterNot(bothHave.contains(_))

In words, filter the badVerticeIdRDD down to those elements that are present in vertex.edges; aggregate those filtered elements into a Set and return the set to the driver; filter vertex.edges down to those elements not in the returned set.

This assumes that vertex.edges (and thus bothHave) has relatively few members.

Rajiv Abraham

unread,
Mar 13, 2013, 6:32:24 PM3/13/13
to spark...@googlegroups.com
Thanks Mark for your solution. I am guilty of oversimplifying the problem... :(. I apologize 

The problem as previously posted was simplified to a single vertex. My problem arises(java.lang.NullPointer Exception) because I have a RDD of vertices and I am trying to call badVerticeIdRDD as a nested RDD for the VerticesRDD. Something like..

val result = verticeRDD.map( v => v.edges.filter(e => e.destination not in  badVerticeIdRDD)) // I have a helper method which encapsulates the 'not in' behaviour

In words, for each vertex in the verticeRDD, delete all edges which lead to a destination in badVerticeIdRDD.

Mark, do correct me if I am wrong, your solution below may face the same java.lang.NullPointer exception if I call the code within verticeRDD? 


Rajiv

2013/3/10 Mark Hamstra <markh...@gmail.com>
--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/KC1UJEmUeg8/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Take care,
Rajiv

Mark Hamstra

unread,
Mar 13, 2013, 6:44:06 PM3/13/13
to spark...@googlegroups.com
Yeah, that's not going to work.  You may find a solution by using a broadcast variable or a join/cogroup to make all the needed data visible from within one RDD.

Rajiv Abraham

unread,
Mar 13, 2013, 8:47:41 PM3/13/13
to spark...@googlegroups.com
Ya, made it work with leftouterjoin. Thanks

2013/3/13 Mark Hamstra <markh...@gmail.com>



--
Take care,
Rajiv
Reply all
Reply to author
Forward
0 new messages