Calling {java.util.Map, scala.collection.mutable.HashMap,scala.collection.mutable.HashMap,java.util.concurrent.ConcurrentHashMap} in map method of RDD

686 views
Skip to first unread message

Aslan Bekirov

unread,
May 5, 2013, 12:34:50 PM5/5/13
to spark...@googlegroups.com

Hi all,

As I now Spark does not support nested RDD call, I mean, using one RDD inside another.

That is, I want to convert one of these RDDs to map and use it in flatmap method of another RDD. Is it correct scenario?
If yes, which Map implementation below I have to use?

java.util.Map,
scala.collection.mutable.HashMap,
scala.collection.mutable.HashMap,
java.util.concurrent.ConcurrentHashMap

Any suggestions please?
BR,
Aslan

Ian O'Connell

unread,
May 5, 2013, 2:24:32 PM5/5/13
to spark...@googlegroups.com
Might be better to be more specific as to what you want to do?. Using one large RDD as part of entries inside another RDD isn't going to scale well or possibly even work. Likely just to see OOM's converting a set into a map.

If you want to take an RDD presuming its small and make it into a map to be used by other RDD's you probably want to do a collect toMap(immutable) and probably broadcast it to all nodes?



--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Aslan Bekirov

unread,
May 6, 2013, 6:12:12 AM5/6/13
to spark...@googlegroups.com, i...@ianoconnell.com
Hi,

Thanks for your response.

I want to lookup some values of one RDD inside map function of another RDD. these two RDDs are quite large.

Since nested RDD call is not supported in spark. I am looking for another solution.

BR,
Aslan

Ian O'Connell

unread,
May 6, 2013, 12:37:10 PM5/6/13
to Aslan Bekirov, spark...@googlegroups.com
If both of them are quite large you probably want to consider re-phrasing the operation, 

From RDD A -> B, your looking up key's in B, is it one key? or a reasonably small number of keys? If so a map on A to produce an extra column which you can join with B is probably what you want. Issuing a group by on the output will give you A with a list of the elements of B that are matching?

Aslan Bekirov

unread,
May 6, 2013, 1:06:23 PM5/6/13
to spark...@googlegroups.com, Aslan Bekirov, i...@ianoconnell.com

Actually there are 5 RDDs. from RDD A , I have to lookup RDD B,C,D,E. That is why, joining all of they together might be very complex solution.

Can't I convert B,C,D,E to java.util.COncurrentHashMap and use it inside RDD A?
Or any other solution?

Ian O'Connell

unread,
May 6, 2013, 2:10:04 PM5/6/13
to Aslan Bekirov, spark...@googlegroups.com
A concurrent hashmap(not sure why you need a thread safe hashmap anyway in this case), will have it materialized on one/all nodes. so isn't what you want if the RDD's are large. You do want a join here, depending on how the RDD's are mapped a pure for loop might be able to set this up for you easily enough to extend out the base RDD with the extra fields to map on.

Aslan Bekirov

unread,
May 7, 2013, 7:45:27 AM5/7/13
to spark...@googlegroups.com, Aslan Bekirov, i...@ianoconnell.com

As I saw join can be done if keys of RDDs are in same data type (e.g String). But my RDDs have different data type keys (one is List<String>, one is String)

How will I join this RDDs? And joining 5 large RDDs is not a drawback? Is it only way to do it? Is it reasonable to use distributed cache or distributed Map implementations(e.g,hazelhast, coherence etc) to store other 4 RDDs? 

Ian O'Connell

unread,
May 7, 2013, 10:17:55 PM5/7/13
to spark...@googlegroups.com, Aslan Bekirov
Spark is built on the primitive of the RDD, which isn't a distribute map, its more of a collection of rows similar to an RDBMS. You could of course stream one set of data from the master to the nodes and search on each nodes data(building hashmaps there) to do a lookup. But other than that I'm not sure how you'd see it working (You might want to look into more of the documents they have on the site.. each node would need to do arbitary fetches at map lookup time to a random other node across the network...which isn't ideal or a spark function).

You can't join strings and lists, if they are both the keys... but you can't lookup a hashmap of strings with a list of strings either. You can run a map function against the RDD's to pull out the join field you wish to use as the lookup against the RDD and then join on that. The 5 RDD's will be partitioned based on the key's so plenty of work will be avoided if they don't intersect at the remote nodes. You can play with using custom partitioners during the earlier loading/processing phases to avoid more cross network traffic but i'd just see about getting it working first....


In summary

spark.RDD[A] map's to spark.RDD[(B, A)]
spark.RDD[C] map's to spark.RDD[(B, C)]

These RDD's can now be joined to produce (B, A, C) as an RDD which should give the effect you are looking for. But as ever without knowing the data types and skew and so on...YMMV

Ultimately if this doesn't fit well maybe spark isn't really what you should be using for your task...
Reply all
Reply to author
Forward
0 new messages