union and spanBy on a cassandra RDD

273 views
Skip to first unread message

Manny Bacolas

unread,
Feb 28, 2017, 5:32:50 PM2/28/17
to DataStax Spark Connector for Apache Cassandra
I am calling union() on multiple cassandra RDDs, all loaded from different cassandra tables which all have the same partition key (but different clustering keys)… followed by a spanBy on the union-ed RDD. But because the order of the data is not sequentially ordered (different clustering keys) the spanBy does not group all the rows with the same partition key together unless I call a sortBy on the RDD prior to the spanBy which kinda kills the whole point of co-locating the data sets on the same cassandra/spark node. Calling repartitionByCassandraReplica prior to spanBy does not help either…

Since all the data is certainly within the same spark executor (not sure about the same spark partition) does a shuffle actually occur when a sortBy is called on the partition key columns?

How can I union multiple RDDs together and group them when the sequential order is not guaranteed even though the co-location of the data is guaranteed. I don’t use the keyBy on the cassandra RDD because I need to load only a subset of a table and join it to a series of other tables. I don’t want the entire set loaded at any point. Is there any way I can set the RDD partitioner without calling a keyBy on the load? Would that even help if the unioned-RDD did have a cassandra partitioner set?

Russell Spitzer

unread,
Feb 28, 2017, 6:11:38 PM2/28/17
to spark-conn...@lists.datastax.com
On Tue, Feb 28, 2017 at 2:32 PM Manny Bacolas <manny....@gmail.com> wrote:
I am calling union() on multiple cassandra RDDs, all loaded from different cassandra tables which all have the same partition key (but different clustering keys)… followed by a spanBy on the union-ed RDD.  But because the order of the data is not sequentially ordered (different clustering keys) the spanBy does not group all the rows with the same partition key together unless I call a sortBy on the RDD prior to the spanBy which kinda kills the whole point of co-locating the data sets on the same cassandra/spark node.  Calling repartitionByCassandraReplica prior to spanBy does not help either…

If there is no Partitioner or the RDD's have different partitioners Union basically just makes a giant mishmash of all your tasks.  https://github.com/apache/spark/blob/v2.0.2/core/src/main/scala/org/apache/spark/SparkContext.scala#L1218-L1226 . This means you now have tons of Spark partitions, some of which have data for the same C* partition key. For example Spark Partition A can have entries for Cassandra Key A but Spark Partition B can ALSO have entires for Cassandra Key A. Since spanBy requires all of the values of A to be contiguous you get odd behavior, i'm assuming multiple groups for the same partition key.

Using the sortBy regroups together your partitions based on Cassandra partition key this is a full shuffle. RepartitionByCassandraReplica is yet another shuffle which only sorts based on InetAddress and not PK value.

 
Since all the data is certainly within the same spark executor (not sure about the same spark partition) does a shuffle actually occur when a sortBy is called on the partition key columns?

How can I union multiple RDDs together and group them when the sequential order is not guaranteed even though the co-location of the data is guaranteed.   I don’t use the keyBy on the cassandra RDD because I need to load only a subset of a table and join it to a series of other tables.  I don’t want the entire set loaded at any point.   Is there any way I can set the RDD partitioner without calling a keyBy on the load?  Would that even help if the unioned-RDD did have a cassandra partitioner set?
 
The only way to set the partitioner is to use keyBy on load. Unfortunately this won't really solve your problem. What you really want is a not only
your union to merge partitions on partition key (a shared partitioner IS required for this since there is no guarentee that the Token Ranges represented by Spark Partitions will match up 1 to 1) but to also do a merge join on their combination. Without this you will still end up with your rows out of partition key order. We actually wrote code for accomplishing this in the Enterprise distribution for dealing with some Graph features but there isn't an open source copy of it. 

Basically what you need to for optimial performance is

Make a single RDD that reads from multiple tables simultaneously
Grab an iterator for the same token range on each table
Merge join the output form the iterators

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

Reply all
Reply to author
Forward
0 new messages