Cassandra Partitioner

59 views
Skip to first unread message

Manny Bacolas

unread,
May 30, 2018, 12:51:15 PM5/30/18
to DataStax Spark Connector for Apache Cassandra
I’m trying to understand if and why there is an advantage in using the keyBy when creating a Cassandra-RDD. In the partitioning doc (16) you guys have written you use two tables with the same partition key. I assume even if you don’t key the data by the partition key it will still be loaded within spark in exactly the same way? One or many C* partitions will be mapped to a single spark partition. During a join I assume there should be no data shuffle between spark executors since all data with the same partition key should be collocated within the same executor (not necessarily the same spark partition)

The two execution plans below are slightly different because the additional map/keyBy step but it doesn’t indicate a shuffle (examples below).

//with a Cassandra paritioner
(12) MapPartitionsRDD[82] at join at <console>:48 []
| MapPartitionsRDD[81] at join at <console>:48 []
| CoGroupedRDD[80] at join at <console>:48 []
| CassandraTableScanRDD[76] at RDD at CassandraRDD.scala:19 []
+-(12) CassandraTableScanRDD[79] at RDD at CassandraRDD.scala:19 []

//without a Cassandra paritioner
(12) MapPartitionsRDD[100] at join at <console>:48 []
| MapPartitionsRDD[99] at join at <console>:48 []
| CoGroupedRDD[98] at join at <console>:48 []
+-(12) MapPartitionsRDD[95] at keyBy at <console>:42 []
| | CassandraTableScanRDD[94] at RDD at CassandraRDD.scala:19 []
+-(12) MapPartitionsRDD[97] at keyBy at <console>:42 []
| CassandraTableScanRDD[96] at RDD at CassandraRDD.scala:19 []



Also, why use the applyPartitionerFrom when dealing with two RDDs/Cassandra-tables with identical partitions (I assume you can only use it when dealing with RDDs with the same C* partitions)? In the example below I assume the two execution plans are essentially identical even though the first indicates an additional level of parallelism? So there is no need to apply the partitioner from the first to the 2nd RDD? Does calling applyPartitionerFrom force some type of localized data shuffle across spark partitions within the same executor hence you can then use groupByKey or spanByKey? Looking at the two diff execution plans they seem pretty much identical.


I also assume when using joinWithCassandraTable there is no need to key the first Cassandra RDD by the partition key? There are no optimizations to be had by applying a partitioner on the first RDD since joinWithCassandraTable queries Cassandra using CQL with a where clause?


//copying the partitioner from the first RDD to the second
import com.datastax.spark.connector._
case class User(userId: String, name: String, zipCode: Int)
case class Purchase(userId: String, purchaseId: String, objectId: String, amount: Int)

val purchasesRdd = { sc.cassandraTable[Purchase]("test", "purchases").keyBy[Tuple1[String]]("user_id")}
val userRdd = { sc.cassandraTable[User]("test", "users").keyBy[Tuple1[String]]("user_id")}

val results = purchasesRdd.join(userRdd)

scala> results.toDebugString
res59: String =
(12) MapPartitionsRDD[82] at join at <console>:48 []
| MapPartitionsRDD[81] at join at <console>:48 []
| CoGroupedRDD[80] at join at <console>:48 []
| CassandraTableScanRDD[76] at RDD at CassandraRDD.scala:19 []
+-(12) CassandraTableScanRDD[79] at RDD at CassandraRDD.scala:19 []

//using applyPartitionerFrom
val results2 = purchasesRdd.join(userRdd.applyPartitionerFrom(purchasesRdd))
results2.toDebugString

scala> results2.toDebugString
res60: String =
(12) MapPartitionsRDD[86] at join at <console>:48 []
| MapPartitionsRDD[85] at join at <console>:48 []
| CoGroupedRDD[84] at join at <console>:48 []
| CassandraTableScanRDD[76] at RDD at CassandraRDD.scala:19 []
| CassandraTableScanRDD[83] at RDD at CassandraRDD.scala:19 []


//without using a Cassandra partitioner
val purchasesWithoutPartitionerRdd = sc.cassandraTable[Purchase]("test", "purchases").keyBy(_.userId)
val userWithoutPartitionerRdd = sc.cassandraTable[User]("test", "users").keyBy(_.userId)

purchasesWithoutPartitionerRdd.partitioner
res63: Option[org.apache.spark.Partitioner] = None

val results3 = purchasesWithoutPartitionerRdd.join(userWithoutPartitionerRdd)
results3.toDebugString
scala> results3.toDebugString
res64: String =
(12) MapPartitionsRDD[100] at join at <console>:48 []
| MapPartitionsRDD[99] at join at <console>:48 []
| CoGroupedRDD[98] at join at <console>:48 []
+-(12) MapPartitionsRDD[95] at keyBy at <console>:42 []
| | CassandraTableScanRDD[94] at RDD at CassandraRDD.scala:19 []
+-(12) MapPartitionsRDD[97] at keyBy at <console>:42 []
| CassandraTableScanRDD[96] at RDD at CassandraRDD.scala:19 []

Russell Spitzer

unread,
May 30, 2018, 1:01:32 PM5/30/18
to spark-conn...@lists.datastax.com
If Spark partitioners on RDDs are equivalent then shuffles can be avoided for many operations. The partitioners must be exactly equivalent, this is why you must apply the partitioner from one RDD to another to ensure they are read with the same Partitioner object.

Similar benefits can accrue if you have a Partitioner on the Key of a PairRDD then do an aggregation based on the key.

A Partitioner is only assigned with *keyBy* because a partitioner must have a way of mapping a Record to a Partition based on a key.

Together this means

sc.cassandraTable("ks","tab").keyBy("partitionKey").groupByKey(...) // does not require a shuffle
sc.cassandraTable("ks","tab").map( x => (row.get("partitionKey"),row)).groupByKey // does not require a shuffle


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




Reply all
Reply to author
Forward
0 new messages