co-partitioning and joins

6,241 views
Skip to first unread message

Koert Kuipers

unread,
Jun 13, 2013, 11:53:53 AM6/13/13
to spark...@googlegroups.com
hello all,
which join/cogroup operations in spark take full advantage of co-partitioning?

i imagine that joins on co-partitioned data sets can effectively become map-side joins?

thanks! koert

Reynold Xin

unread,
Jun 13, 2013, 4:25:37 PM6/13/13
to spark...@googlegroups.com
That is correct.

--
Reynold Xin, AMPLab, UC Berkeley



--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Koert Kuipers

unread,
Jun 13, 2013, 6:04:39 PM6/13/13
to spark...@googlegroups.com
Reynold,
Do i understand it correctly that all i have to do is set the the Partitioner?

so for example if a have 2 RDDs a and b, and b is smaller and i want to avoid shuffling a across network, i do:
a.join(b, partitioner = a.partitioner.get)

and if i want to make it a map-side join, i do:
a1 = a.partitionBy(myPartitioner)
b1 = b.partitionBy(myPartitioner)
and now a.join(b) is a map-side join?

srikanth reddy

unread,
Aug 1, 2013, 8:24:20 PM8/1/13
to spark...@googlegroups.com


On Thursday, June 13, 2013 3:04:39 PM UTC-7, Koert Kuipers wrote:

and if i want to make it a map-side join, i do:
a1 = a.partitionBy(myPartitioner)
b1 = b.partitionBy(myPartitioner)
and now a.join(b) is a map-side join?
++
I have this doubt too. how do we make sure that RDD-element with a particular key of a1 and b1 reside inside the same node?
Assuming a1 and b1 both use hashpartitioned and partitioned by hashvalue of key into 4 RDDs.

Also if a1 and b1 are normal RDDs not partitioned, then when those RDDs are computed what policy decides how many RDDs they will be split and does it use any hashpartition mechanism in this case or the RDDs are randomly split??

Reynold Xin

unread,
Aug 1, 2013, 8:45:35 PM8/1/13
to spark...@googlegroups.com
On Thu, Aug 1, 2013 at 5:24 PM, srikanth reddy <srikan...@gmail.com> wrote:


On Thursday, June 13, 2013 3:04:39 PM UTC-7, Koert Kuipers wrote:

and if i want to make it a map-side join, i do:
a1 = a.partitionBy(myPartitioner)
b1 = b.partitionBy(myPartitioner)
and now a.join(b) is a map-side join?
++
I have this doubt too. how do we make sure that RDD-element with a particular key of a1 and b1 reside inside the same node?
Assuming a1 and b1 both use hashpartitioned and partitioned by hashvalue of key into 4 RDDs.


There are two things here. One is data co-partition, and then data co-location. 

As long as you specify the same hash partitioner, a1 and b1 will be co-partitioned (i.e. partitioned the same way). The partitions might not be co-located, so your job will still incur network traffic, but you do reduce a whole round of shuffle.

Now to get co-location - that all depends on when rdd a1 and b1 are put into memory. Note that just writing 

val a1 = a.partitionBy(myPartitioner).cache
val b1 = b.partitionBy(myPartitioner).cache
c = a1.join(b1)

is not going to put the two memory since no job is needed here. Declaring a1.join(b1) only creates the metadata for c (lineage) because of RDD's lazy evaluation nature. 

If you do a count, e.g.

val a1 = a.partitionBy(myPartitioner).cache
val b1 = b.partitionBy(myPartitioner).cache
c = a1.join(b1)
val num = c.count

count is an action that requires a response, and will launch a job to execute the join. The first join will actually force a shuffle for a to create a1, and a shuffle for b to create b1. In this case, partitions of a1 and b1 will actually be co-located as well because it is launched by the same job.

If a1 and b1 are put into memory by different jobs, there is no guarantee their partitions will be co-located, even though they are co-partitioned.

e.g.

val a1 = a.partitionBy(myPartitioner).cache
val b1 = b.partitionBy(myPartitioner).cache
c = a1.join(b1)
a1.count
b1.count

Both count actions will force a shuffle because of the partitionBy, but there is no guarantee that the reduce side tasks get run on the same nodes, and thus no guarantee on data colocation.


Also if a1 and b1 are normal RDDs not partitioned, then when those RDDs are computed what policy decides how many RDDs they will be split and does it use any hashpartition mechanism in this case or the RDDs are randomly split??
 

RDDs are always partitioned, even without the specification of a partitioner. How an RDD is partitioned depends on the RDD class definition itself.

If it is an HadoopRDD, then the partitions are specified by the Hadoop input format.



 

srikanth reddy

unread,
Aug 1, 2013, 11:50:32 PM8/1/13
to spark...@googlegroups.com
Thanks. Great answer Reynold!
--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/gUyCSoFo5RI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

srikanth reddy

unread,
Aug 2, 2013, 4:34:56 PM8/2/13
to spark...@googlegroups.com
  
    val distPool = sc.parallelize(poolL)

poolL is a normal scala collection and its not a spark RDD and I am not using any partitionBy() call on the RDD distPool.

case-1) The datastore from where poolL is populated is lets say a local file from local file system(not hdfs)

What would be the partition mechanism for distPool RDD?  how many partitions would it have? Does it depend on the size of distPool and the available RAM on the worker nodes?

 When will the partitions(or # of partitions and the nodes hosting the partitions) be materialized? 

Will it be materialized only when some computation triggers the calculation of distPool RDD? 

case-2) Lets say poolL is populated in memory and not from a local file. How does it change the partition mechanism for distPool RDD?


=Thanks



--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/gUyCSoFo5RI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

Reynold Xin

unread,
Aug 2, 2013, 5:12:48 PM8/2/13
to spark...@googlegroups.com
On Fri, Aug 2, 2013 at 1:34 PM, srikanth reddy <srikan...@gmail.com> wrote:
  
    val distPool = sc.parallelize(poolL)

poolL is a normal scala collection and its not a spark RDD and I am not using any partitionBy() call on the RDD distPool.

If you are using "parallelize", there is no difference in how poolL is populated. 

parallelize is not meant to be used to transfer large amount of data from the master to the workers. It is meant for testing, or use to parallelize a small initial data.

parallelize has a second parameter that sets the number of slices. 

val rdd = sc.parallelize(someCollection, n)

will create a RDD of n partitions, each having a slice of the local collection.

 

case-1) The datastore from where poolL is populated is lets say a local file from local file system(not hdfs)

What would be the partition mechanism for distPool RDD?  how many partitions would it have? Does it depend on the size of distPool and the available RAM on the worker nodes?

See above.
 
 

 When will the partitions(or # of partitions and the nodes hosting the partitions) be materialized? 


They are never materialized unless you mark it as "persist" or "cache", and then have a job that actually require it. Only in that case will the entire slices of data be materialized on the worker.

If you are just doing

rdd.count

it does NOT materialize the RDD. It uses an iterator interface under the hood to process one tuple at a time. Of course, in parallelize, Spark does need to send slices of poolL to the workers, so each task will need to see a slice of the data either way.

 
Will it be materialized only when some computation triggers the calculation of distPool RDD? 

case-2) Lets say poolL is populated in memory and not from a local file. How does it change the partition mechanism for distPool RDD?




 

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.

srikanth reddy

unread,
Aug 2, 2013, 6:14:03 PM8/2/13
to spark...@googlegroups.com
Got it. Thanks!

Koert Kuipers

unread,
Aug 2, 2013, 10:30:18 PM8/2/13
to spark...@googlegroups.com
reynold,
i am not sure i understand this statement:


"If a1 and b1 are put into memory by different jobs, there is no guarantee their partitions will be co-located, even though they are co-partitioned."

i always assumed that using the same hash partitioner across jobs would lead to co-location. what would be an example where this is not the case?   



--

Reynold Xin

unread,
Aug 3, 2013, 1:41:51 AM8/3/13
to spark...@googlegroups.com
val partitioner = new HashPartitioner(5)
val a1 = a.partitionBy(partitioner).cache()
val b1 = b.partiitonBy(partitioner).cache()

a1.count()
b1.count()

So a1 will have 5 partitions, and b1 will also have 5 partitions. But there is no deterministic order in which tasks to generate the blocks for a1 and b1 are run. Let's say you have 2 worker nodes, each having only 1 core.

Tasks get submitted deterministically (i.e. task for partition 0 is submitted first, and then partition 1, partition 2, ... partition 5). If the cluster is always free, and it takes identical time to process partitions of a1, then for a1, you end up with:

node 1: partition 1
node 2: partition 2
node 1: partition 3
node 2: partition 4
node 1: partition 5

But let's say for b1, it actually takes longer to process partition 1, then you might end up with

node 1: partition 1
node 2: partition 2
node 2: partition 3
node 1: partition 4
node 2: partition 5

Now you get a different order of execution, and partitions are stored on different nodes. 

--
Reynold Xin, AMPLab, UC Berkeley

Koert Kuipers

unread,
Aug 3, 2013, 3:00:51 PM8/3/13
to spark...@googlegroups.com
why not make location based on partition? something like hash modulo nr_nodes... it seems to me you gain a lot from that. one can reason about location. for example if i have RDD a and i derive RDD b from it using transformations that preserve partitioning, then i can rely on a join between a and b being very fast and without network activity in the future (if location is tied to partition in a deterministic way).

without location being tied to partition, what is the benefit of using transformations that preserve partitioning?
is the only benefit if you use them within the same job?
and what exactly is "within the same job"?

thanks! koert

Reynold Xin

unread,
Aug 3, 2013, 3:17:06 PM8/3/13
to spark...@googlegroups.com
What if you add a node to the cluster, or a node dies? Or what if a node is busy and can never be run anything - are you going to block the entire program forever so you could put a partition on that node?

You can still write your program in a way that does mostly local joins. And even if the joins need to fetch blocks from remote nodes, the joins still avoid an extra round of shuffle, which is much more expensive than just fetching data.

Look at the PageRank example (Section 3.2.2) in Spark's NSDI paper: http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

Mark Hamstra

unread,
Aug 3, 2013, 3:38:47 PM8/3/13
to spark...@googlegroups.com
Reynold already told what a job is and what "within the same job" means in his prior response, but let's try it with a little different emphasis.  A job is a computation within a Spark cluster that gets kicked off by an action (count, reduce, etc.) -- it's what requires RDDs to actually be evaluated and results produced.  Evaluation of each RDD within a job has to proceed by working through the lineage of that RDD.  That evaluation doesn't begin at the beginning for RDDs that have been checkpointed or persisted and evaluated as part of a prior job.  If more than one RDD is part of the job (e.g. the action requires a join) and they both need to be partitioned within that job, then they will be co-partitioned and also co-located if the same partitioner is used for the RDDs.  However, even if the same partitioner is used for an RDD that needs to be partitioned within the current job as was used to partition another RDD that is needed in the current job but whose partitioning was persisted in a prior job, then the RDDs will still be co-partitioned but won't necessarily be co-located.

Any clearer?

Koert Kuipers

unread,
Aug 3, 2013, 4:08:37 PM8/3/13
to spark...@googlegroups.com
see response inline. best, koert


On Sat, Aug 3, 2013 at 3:17 PM, Reynold Xin <rx...@cs.berkeley.edu> wrote:
What if you add a node to the cluster, or a node dies? Or what if a node is busy and can never be run anything - are you going to block the entire program forever so you could put a partition on that node?
systems like cassandra deal with this while tying location to partition, and they can add nodes and nodes can die.
 

You can still write your program in a way that does mostly local joins. And even if the joins need to fetch blocks from remote nodes, the joins still avoid an extra round of shuffle, which is much more expensive than just fetching data.
yeah i got that and this is indeed really cool. if i understand it correctly spark is even smart enough to only move the smaller RDD to the bigger one which is a big win.
 

Look at the PageRank example (Section 3.2.2) in Spark's NSDI paper: http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
OK i will check again.
 

Koert Kuipers

unread,
Aug 3, 2013, 4:12:20 PM8/3/13
to spark...@googlegroups.com
yes very clear, thanks! the whole "same partitioner but cached in a different job" thing got me confused for a bit, but now it makes sense.

Koert Kuipers

unread,
Aug 3, 2013, 4:16:39 PM8/3/13
to spark...@googlegroups.com
when i watch a spark job go, i often see multiple iterations of map, shuffle and reduce. for a second i thought each such iteration was a "job" which would mean the co-location benefit is extremely limited, which got me a bit confused. thanks again mark and reynold for the lengthy explanations

Mark Hamstra

unread,
Aug 3, 2013, 4:46:06 PM8/3/13
to spark...@googlegroups.com
Yes, most of what you are seeing is log messages from tasks and stages.  Actions begin jobs, jobs are broken down into stages (at shuffles, essentially), and stages are sets of tasks.  Individual tasks run on the worker nodes and are essentially the application of a function to the elements of a partition.

srikanth reddy

unread,
Aug 5, 2013, 8:52:52 PM8/5/13
to spark...@googlegroups.com
Hi,

Here I am reading the file from hdfs. But my hdfs cluster and spark cluster do not share any nodes. They are segregated.

++++++++++
scala>  val cust1  = sc.textFile(aupf)
13/08/17 13:03:11 INFO storage.MemoryStore: ensureFreeSpace(127912) called with curMem=1664390, maxMem=339585269
13/08/17 13:03:11 INFO storage.MemoryStore: Block broadcast_13 stored as values to memory (estimated size 124.9 KB, free 322.1 MB)
cust1: spark.RDD[String] = MappedRDD[111] at textFile at <console>:20

scala> cust1.partitioner
res96: Option[spark.Partitioner] = None

scala> cust1.partitionBy(new HashPartitioner(4))
<console>:23: error: value partitionBy is not a member of spark.RDD[String]
       cust1.partitionBy(new HashPartitioner(4))

++++++
1)  cust1.partitioner shows as 'None' . Is it happening because my hdfs cluster does not share any nodes with spark cluster?
Does this behaviour change if I have both hdfs and spark running in same nodes?

2)  Also partitionBy does not work cust1.  Looks like it always expects the RDD in (key, value) format for it to work.
each cust1 element represents a tab delimited text line and it does not have any (key,value) notion and I wanted to partition it so that the split on delimiters and filters on split fields on cust1 would be parallelized down the flow.  Is there anyway I can partition and parallalize cust1 based on linenumbers etc into 4 partitions? 

3)  I do create a pair out of cust1 down the flow and use groupby. The resultanting group by RDD does show partitioner=hashpartitioner because of the groupby operation. But I am curious how many partitions (RDD cust) will it be creating based on the hash values of keys when it gets materialized. Please see below. 

+++++++++++
val pair1 = cust2.filter(line => line.length == 8 ).map(line => (line(uid) ,line(provid).substring(0, line(provid).length-8) + line(catid) ))

    val cust = pair1.groupByKey().mapValues(l => l.distinct)

scala> pair1.partitioner
res98: Option[spark.Partitioner] = None

scala> cust.partitioner
res99: Option[spark.Partitioner] = Some(spark.HashPartitioner@1403c7f1)


+++++++++

I appreciate your feedback.

Reynold Xin

unread,
Aug 5, 2013, 9:02:21 PM8/5/13
to spark...@googlegroups.com
On Mon, Aug 5, 2013 at 5:52 PM, srikanth reddy <srikan...@gmail.com> wrote:
1)  cust1.partitioner shows as 'None' . Is it happening because my hdfs cluster does not share any nodes with spark cluster?
Does this behaviour change if I have both hdfs and spark running in same nodes?

No it doesn't. The partitions have "locality" specified (preferred locations), but don't have partitioner.
 

2)  Also partitionBy does not work cust1.  Looks like it always expects the RDD in (key, value) format for it to work.
each cust1 element represents a tab delimited text line and it does not have any (key,value) notion and I wanted to partition it so that the split on delimiters and filters on split fields on cust1 would be parallelized down the flow.  Is there anyway I can partition and parallalize cust1 based on linenumbers etc into 4 partitions? 

val cust1partitioned = cust1.map { line =>
   val fields = line.split("\t")
   (fields(0), /* whatever else you want */)
}.partitoinBy(...)

This is actually just a very basic Spark RDD transformation. You should go through some simple tutorials or the ampcamp exercises first.
 

3)  I do create a pair out of cust1 down the flow and use groupby. The resultanting group by RDD does show partitioner=hashpartitioner because of the groupby operation. But I am curious how many partitions (RDD cust) will it be creating based on the hash values of keys when it gets materialized. Please see below. 

You should go read the API documentation. Partitioner class has a method that tells you how many partitions there are.


srikanth reddy

unread,
Aug 5, 2013, 10:46:22 PM8/5/13
to spark...@googlegroups.com

The partitions have "locality" specified (preferred locations), but don't have partitioner.
++++
OK this was the missing link that clarifies my doubts.
Thanks for the answer. 
Yes, API documentation does contain good info when compared to exercises for these kind of questions.

taipe...@gmail.com

unread,
Sep 11, 2013, 4:14:30 AM9/11/13
to spark...@googlegroups.com
hi, Reynold

    since map and filter is narrow dependencies, so it will not change the "the co-locationed" relationship?

    in Python
############################
rdd = sc.textFile("test_file").partitionBy(50, hashfunc).cache()    #rdd is located on 50 different nodes
rdd2 = rdd.filter(mapfunc).map(mapfunc).cache()          #here rdd and rdd2 is co-locationed or co-partitioned?  
#rdd2.partitionBy(50, hashfunc)             #i do not need to partition it again.
result = rdd.join(rdd2).collect()               #it will work in co-patritioned mode.
#############################
    is it right?
Thanks

在 2013年8月2日星期五UTC+8上午8时45分35秒,Reynold Xin写道:
Reply all
Reply to author
Forward
0 new messages