Understanding repartitionByCassandraReplica

2,351 views
Skip to first unread message

CJ

unread,
Aug 4, 2015, 10:21:16 AM8/4/15
to DataStax Spark Connector for Apache Cassandra
I'm having trouble understanding the number of partitions returned by repartitionByCassandraReplica

Here's my table:

CREATE TABLE mykeyspace.positions (
objectid text,
hash int,
key text,
rowdata blob,
PRIMARY KEY ((objectid, hash), key)
)

I use the hash basically to split the object associated with each objectid into a chosen number of rows. In this case, hash takes on values 0 through 5.

Here is my spark code:

case class PositionData(objectid : String,
hash : Int,
key : String,
rowdata : Array[Byte])

case class Partition(objectid : String, hash : Int)

def test(sc : SparkContext, objectid : String) = {
val hashes = List(0,1,2,3,4,5)

val partitions = sc.parallelize(hashes).map(x => Partition(objectid, x))

println(partitions.partitions.length)


val withReplica =
partitions.repartitionByCassandraReplica("mykeyspace","positions")

println(withReplica.partitions.length)

val partitionsAndData =
withReplica
.joinWithCassandraTable[PositionData]("mykeyspace","positions")

println(partitionsAndData.partitions.length)

partitionsAndData.values.map(x => x.rowdata).map(deserialize(_))
}

After I do repartitionByCassandraReplica, spark tells me I have 30 partitions. Why? There are 3 objects in the table, each with 6 different partition keys. The keyspace has replication factor of two.

Im trying to do this so each spark node can query its local cassandra node.

Also, this code is really, really slow. 28 of the 30 tasks finish in a second, and then the last two take up to 30 seconds. Why?

Russell Spitzer

unread,
Aug 4, 2015, 12:02:04 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
I would highly recommend not using Spark for this small an amount of data. It's not optimized at all for a use case this tiny. You most likely will have much better performance just using a direct driver connection and token aware routing unless you need this data somehow parallelized for a future query.

Even in that case I would suggest not using repartitionByCassandraReplica, since this operation is not free (it requires a shuffle) and will not have a great deal of benefit for use cases in the tens of keys. Just use a plain joinWithCassandraTable and although you won't have data locality it really doesn't matter in such a small sample.

As to how repartitionByCassandra works: 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#repartitioning-rdds-based-on-a-cassandra-tables-replication

You'll notice that there is a third parameter partitionsPerHost which controls the number of SparkPartitions to make per host. If you would only like 1 spark partition for each machine in your cluster you would pass in 1. The default is 10 and since you have 3 nodes that means you get 30 total partitions. 

As to why some of your tasks are slow and others are not, this is most likely because most of the tasks that the repartition command made are empty.

Please let me know if you have anymore questions and I hope this was helpful,
Russ


 
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

CJ

unread,
Aug 4, 2015, 12:41:27 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
Ah, I see. So I changed my code to create 2 partitions per host, and I also changed my keyspace to have replication factor of 3, where all nodes have all data. But when I do repartitionByCassandraReplica, I get 6 partitions, but all of the keys in one partition. That partition could be sent to any of 3 machines, but on every run it seems that all keys go to one partition, with 5 empty partitions. This is why the last task takes a long time.

This seeems like a bug, as I read through the code for repartitionByCassandraReplica, and each key should be assigned to a random partition. I even do keyByCassandraReplica and print the result, and it shows that each key has all 3 nodes to choose from. Yet each key ends up choosing the same node. Is this a bug?

Russell Spitzer

unread,
Aug 4, 2015, 1:25:11 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
Looks like a bug, i'll take a look

Russell Spitzer

unread,
Aug 4, 2015, 1:32:47 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
I ran a few quick tests and it just seems like having too few keys doesn't sufficiently produce really well distributed partitions. I think you should just avoid repartitionByCassandraReplica until you have 100+ keys. I'll look into a better solution for smaller amounts of keys in the future but since I don't really want to be encouraging the use of this function with smaller key counts I'm not sure we should improve it. 

Test Code
sc.parallelize(1  to N).map( x => Tuple1(x.toString)).repartitionByCassandraReplica("test","demo3").mapPartitionsWithIndex{ case (i, x) => for (e <- x) yield (i, e)}.countByKey

10 Keys :
res17: scala.collection.Map[Int,Long] = Map(4 -> 2, 9 -> 8)

100 Keys:
res19: scala.collection.Map[Int,Long] = Map(0 -> 16, 5 -> 8, 1 -> 8, 9 -> 16, 2 -> 8, 3 -> 16, 8 -> 12, 4 -> 16)

1000 Keys:
res20: scala.collection.Map[Int,Long] = Map(0 -> 104, 5 -> 144, 1 -> 96, 6 -> 80, 9 -> 112, 2 -> 144, 7 -> 48, 3 -> 88, 8 -> 88, 4 -> 96)

10000 Keys:
res21: scala.collection.Map[Int,Long] = Map(0 -> 1112, 5 -> 968, 1 -> 1008, 6 -> 856, 9 -> 1048, 2 -> 1064, 7 -> 1008, 3 -> 1040, 8 -> 944, 4 -> 952)

--

CJ

unread,
Aug 4, 2015, 1:45:23 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
Thanks, I increased the keysize and see a little more distribution, but nothing anywhere near what I would want. Even with 100 keys, occassionally I have 48 going into one partition and some partitions being empty. It doesnt seem like java.util.Random is doing the best job here...

Also, I dont know why I would want a ton of keys, since cassandra would seem to be able to read faster with less keys. I really just want one key, but with one key spark reads the whole data into one partition. I am basically tuning the number of keys for spark.

Russell Spitzer

unread,
Aug 4, 2015, 2:02:29 PM8/4/15
to DataStax Spark Connector for Apache Cassandra
The joinWithCassanadra API is tailored around being used with a very large number of keys so if you were using that API I assumed you had a huge number of partition keys you wanted loaded into Spark. Also if you switch RF = 3 then you shouldn't repartition at all since there are no "preferred_locations" since all of the nodes are equally good for execution. 
The way the connector is currently setup provides several ways to load a single partition key.

// Generates RDD with a single partition
sc.cassandraTable("ks","ts").where("pk=key") 

// makes number of partitions as set in parellize statement
sc.parallelize( clusteringKeyValues ).map( cKV => "partitionkey",CKV).joinWithCassandraTable 

and of course if you have some other way of referencing the data you can always try

sc.paralellize( clusteringKeyValues).mapPartitions( it => CassandraConnector.withSessionDo { session => do some stuff } )


What are you trying to accomplish using spark with this code because loading a single Partition into Spark is almost always an anti-pattern (or at least not playing to Spark's strengths)?

CJ

unread,
Aug 5, 2015, 10:04:19 AM8/5/15
to DataStax Spark Connector for Apache Cassandra
I'm not trying to load into one partition; I'm trying to load one object in parallel. I use a compound partition key: (objectid,hash), where I set the range of the hash and determine how much the data is split. I think loading in parallel will give me faster load times, as well as have the data partitioned for the next step, which is a computationally intensive step. I would like to do this as opposed to loading it and then repartitioning.

I don't think using java.util.Random to pick a partition within repartitionByCassandraReplica is the best strategy, because I am seeing hotspot partitions, even with 10000 keys. A single partition can end up with triple or more the number of keys as another partition and is doing triple the amount of work. I can get an even distribution of work if i do joinWithCassandraTable without a repartitionByCassandraReplica, but then I don't get data locality. I would like to repartitionByCassandraReplica and have the data evenly spread across partitions. I want data locality AND even distribution of work.

Russell Spitzer

unread,
Aug 5, 2015, 10:38:56 AM8/5/15
to DataStax Spark Connector for Apache Cassandra
If we end up with that kind of distribution with 10k Keys then we have a problem. Let me look into some alternate distributing schemas. I'm thinking that we don't have to use random at all for the decision of which "replica" or internal "replica partition" we use and instead just have ongoing incrementing indexes. Let me sketch up a quick patch. 

CJ

unread,
Aug 5, 2015, 10:48:13 AM8/5/15
to DataStax Spark Connector for Apache Cassandra
I mean, the distributions are not always that bad, its more likely I get large partitions that are 1.5-2 times as large as smaller partitions. But there's nothing stopping the distributions from being bad. It would just be really nice if it was as even as possible. And if it was even for any keyset size. And yes, I was thinking a round robin type scheme would work perfectly.

Russell Spitzer

unread,
Aug 5, 2015, 11:46:33 AM8/5/15
to DataStax Spark Connector for Apache Cassandra
I looked back to the code and remembered why I didn't do Round Robin before. Basically every executor is running the same code on a different machine so sharing state to actually round robin is basically impossible without sharing state all the time. All the calling function of "getPartition" knows is the value of the current key so any other shared state can at best only be done at the VM level. This means at best we could do a "round-robin" per executor VM which may be close enough ... Testing now
Message has been deleted

CJ

unread,
Aug 5, 2015, 12:44:56 PM8/5/15
to DataStax Spark Connector for Apache Cassandra
Couldn't you also change keyByCassandraReplica to include some index information in the key? And then use that index information in getPartition?

Russell Spitzer

unread,
Aug 5, 2015, 12:52:07 PM8/5/15
to DataStax Spark Connector for Apache Cassandra
There is only one "Partitioner" object which has to have all the state than any executor would need to assign a partition to a key. The Spark Partitioner class basically needs to operate without specific state. So while we could generate other indexes or what not we can't pass different ones depending on the executor.

Steve Ash

unread,
Aug 11, 2015, 10:21:36 AM8/11/15
to DataStax Spark Connector for Apache Cassandra
Right now I am running into a problem where I really want the behavior of ReplicaPartitioner + HashPartitioner -- such that i get data locality to the replica node and within that spark worker I get key affinity per partition. I want to enable something like:
1- get my streaming minibatch
2- repartitition on replica + key
3- do some perPartition transforms (like span which seems broken if you have a partitionsPerHost > 1)
4- save to cassandra

But because the keys are randomly assigned to partitions _within_ a host, I get non-deterministic behavior.

I am going to try to just implement my own replica partitioner that basically just delegates to HashPartitioner instead of rand.nextInt(), but this seems like a useful, optional feature of replicaPartitioner

Russell Spitzer

unread,
Aug 11, 2015, 12:29:43 PM8/11/15
to DataStax Spark Connector for Apache Cassandra
@Steve That sounds like a good idea, if instead of using a Psuedo-random assigner, and assigned based on some deterministic hash of the key we would solve a bunch of issues at once. I wonder if it would be enough to select the replica and internal partition off of the hash rather than the radom.nextInt. 

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Steve Ash

unread,
Aug 11, 2015, 1:05:12 PM8/11/15
to DataStax Spark Connector for Apache Cassandra
Do you want a PR for it? I haven't done it yet, but don't mind doing it later. Unless you can see other problems. Seems pretty straight forward though.

Russell Spitzer

unread,
Aug 11, 2015, 1:10:22 PM8/11/15
to DataStax Spark Connector for Apache Cassandra
Please do, of course this won't necessarily fix the small number of pk's issue but having deterministic behavior would be nice. If you could make a ticket on the Jira as well I would really appreciate it. https://datastax-oss.atlassian.net/browse/SPARKC/

On Tue, Aug 11, 2015 at 10:05 AM Steve Ash <stev...@gmail.com> wrote:
Do you want a PR for it? I haven't done it yet, but don't mind doing it later. Unless you can see other problems.  Seems pretty straight forward though.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Steve Ash

unread,
Oct 20, 2015, 11:24:25 AM10/20/15
to DataStax Spark Connector for Apache Cassandra
Reply all
Reply to author
Forward
0 new messages