Always one partition if filtering by partition key

219 views
Skip to first unread message

Artur

unread,
Mar 6, 2017, 4:22:52 PM3/6/17
to DataStax Spark Connector for Apache Cassandra
If there is a filter by some partition key, then CassandraPartitionGenerator is created with splitCount == Some(1):

https://github.com/datastax/spark-cassandra-connector/blob/v1.6.5/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L220

Thus, even if my C* partition is much bigger than spark.cassandra.input.split.size_in_mb, it still produces RDD with only one partition.

What I did wrong?

Artur

unread,
Mar 6, 2017, 4:24:19 PM3/6/17
to DataStax Spark Connector for Apache Cassandra

Forgot to say that I use 1.6.5 connector

Russell Spitzer

unread,
Mar 6, 2017, 4:35:06 PM3/6/17
to DataStax Spark Connector for Apache Cassandra
That's simply the way the connector is designed. Since partition bounds must be made prior to selecting rows there is no way to subdivide a single Cassandra partition. To do so would require foreknowledge of the distribution of data within the Cassandra Partition. For all but the most trivial clustering keys this would be a difficult exercise but we are welcome to suggestions.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

Artur R

unread,
Mar 6, 2017, 6:12:09 PM3/6/17
to spark-conn...@lists.datastax.com
Furthermore, if filter by IN clause then it still returns only one partitions and obvious it's not the best behavior :)

I can try to create a patch for this, what do you think?


And to the filter by partition key with Equals clause: Is there any way to get range of clustering tokens within some partition?
If yes, then one can just distribute them among Spark executors.


Thanks, Russell!

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Russell Spitzer

unread,
Mar 6, 2017, 6:23:04 PM3/6/17
to spark-conn...@lists.datastax.com
IN Clauses are just pushed down directly to C*. If you have many many partitions, or very large partitions, use joinWithCassandraTable which uses a distributed set. We try to do direct pushdowns whenever possible. I'm not sure I really want to allow folks to run distributed IN's by using a "where IN" because that will encourage people to pass very very long Strings as queries. For example running "where x IN (1M entries here)" which would lead folks to doing patterns like.

// Huge collection and serialization of string
val partitionsToRead = rdd.collect
sc.cassandraTable().where("""x in $(partitionsToRead.mkString(","))""") 

When they should be doing

rdd.joinWithCassandraTable()

As for clustering *tokens* the issue is there is no clustering token. The clustering key can be any combination of fields which leaves us with the issue of determining a distribution of what is essentially a composition of various fields. If Cassandra gave us some Max and Min values for each field we might be able to do some sort of estimation but to my knowledge this information is not known and cannot be determined without full partition reconciliation on the server. Ideally if we had medians (quarter, half, 3/4) we could do a nice little estimation and break up partitions.

On Mon, Mar 6, 2017 at 3:12 PM Artur R <ar...@gpnxgroup.com> wrote:
Furthermore, if filter by IN clause then it still returns only one partitions and obvious it's not the best behavior :)

I can try to create a patch for this, what do you think?


And to the filter by partition key with Equals clause: Is there any way to get range of clustering tokens within some partition?
If yes, then one can just distribute them among Spark executors.


Thanks, Russell!
On Mon, Mar 6, 2017 at 10:34 PM, Russell Spitzer <rus...@datastax.com> wrote:
That's simply the way the connector is designed. Since partition bounds must be made prior to selecting rows there is no way to subdivide a single Cassandra partition. To do so would require foreknowledge of the distribution of data within the Cassandra Partition. For all but the most trivial clustering keys this would be a difficult exercise but we are welcome to suggestions.
On Mon, Mar 6, 2017 at 1:24 PM Artur <ar...@gpnxgroup.com> wrote:
On Monday, March 6, 2017 at 10:22:52 PM UTC+1, Artur wrote:
> If there is a filter by some partition key, then CassandraPartitionGenerator is created with splitCount == Some(1):
>
> https://github.com/datastax/spark-cassandra-connector/blob/v1.6.5/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L220
>
> Thus, even if my C* partition is much bigger than spark.cassandra.input.split.size_in_mb, it still produces RDD with only one partition.
>
> What I did wrong?

Forgot to say that I use 1.6.5 connector

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Artur R

unread,
Mar 6, 2017, 6:53:04 PM3/6/17
to spark-conn...@lists.datastax.com
What if to add some option that limits number of tokens in IN clause OR option that limits size of text of serialized query?

So it'll prevent one to shoot his foot but allow it in special cases.

In particular, I have timeseried table with primary key (day, time) and I just can't transparently filter it by some range of days because the connector returns only one partition for, say 10 days. Sure I can repartition it after all, but it is slow.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Artur R

unread,
Mar 6, 2017, 6:54:13 PM3/6/17
to spark-conn...@lists.datastax.com
And sure I can use joinWithCassandraTable() internally but it's not transparent for the end user.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsubscrib...@lists.datastax.com.


Russell Spitzer

unread,
Mar 6, 2017, 6:59:49 PM3/6/17
to spark-conn...@lists.datastax.com
I think if you bounded the number of token in the clause to say 30 or less that would probably be ok. Really getting larger than that is not a great idea. But the other difficulty here is how do you want to specify the parallelization of those tokens? Do you define it in the clause itself? Having the Spark Partitions just based on the previous RDD just seems so much simpler to me, it allows the most flexibility.

For filtering within the partition what you should do is multiple calls, each with different clustering key bounds, then union the results together. IE

val partitions = sc.parallelize( your list, control your partitioning here) // Ideally this comes from some already distributed source
val allRecords = sc.union( for (range in ranges) yield { partitions.joinWithCassandraTable().where(ckey in range) )





To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Artur R

unread,
Mar 6, 2017, 7:18:12 PM3/6/17
to spark-conn...@lists.datastax.com
Well, ideally connector could just evenly distribute list of C* partitions in the same way as it currently distributes all C* partitions if no filtering at all.

But if you think the idiomatic way is to use joinWithCassandraTable then I'll extend connector's relation (I use dataframes) and internally convert query with IN clauses to query with joinWithCassandraTable. 

7 Мар 2017 г. 4:59 пользователь "Russell Spitzer" <rus...@datastax.com> написал:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Russell Spitzer

unread,
Mar 6, 2017, 7:29:22 PM3/6/17
to spark-conn...@lists.datastax.com
I would be really interested in any dataframe related additions. I think you are right with DF we can't really differentiate so we should probably on receiving those in clauses do a JWCT instead. In DF I would just probably do the sc.parallelize(shuffleParallelism? or sc.defaultParallelism(default arg)).

The current distribution of Spark Partitions is done on size estimates of the whole table so i'm not sure that it would really translate. Like what is the right size_in_mb to get 1 C* partition per Spark Partition? The partitions are also composed of multiple vnode ranges so that would also kinda be complicated. It's probably best to either somehow specify explicitly a number of spark partitions and have the incoming set divided.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Artur R

unread,
Mar 6, 2017, 7:51:29 PM3/6/17
to spark-conn...@lists.datastax.com
So there is no way to retrieve size of some partition from C* somehow? Maybe ask in C* mailing list? 

7 Мар 2017 г. 5:29 пользователь "Russell Spitzer" <russell...@gmail.com> написал:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Russell Spitzer

unread,
Mar 6, 2017, 7:55:18 PM3/6/17
to spark-conn...@lists.datastax.com
You can ask, but basically without reading the partition there is no way to know it's size. I'll ask on my side too since it's been a while since I asked.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Russell Spitzer

unread,
Mar 7, 2017, 1:00:24 PM3/7/17
to spark-conn...@lists.datastax.com
Sounds like while partition stats would be possible to add they currently do not exist.

Artur R

unread,
Mar 7, 2017, 2:29:22 PM3/7/17
to spark-conn...@lists.datastax.com
Don't get it. Please, explain, what do you mean? 

7 Мар 2017 г. 11:00 PM пользователь "Russell Spitzer" <rus...@datastax.com> написал:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer



...

Russell Spitzer

unread,
Mar 7, 2017, 2:30:37 PM3/7/17
to spark-conn...@lists.datastax.com
The C* code currently doesn't store any statistics like the ones we would need to calculate partition info without scanning the whole partition. It would be possible to add this information (or approximates) to the sstables and I think that would probably be sufficient for our purposes. This would require a C* patch.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer


Artur R

unread,
Mar 7, 2017, 2:45:57 PM3/7/17
to spark-conn...@lists.datastax.com
Got it. Well, cool. 
I am not familiar with C* development process, won't they reject PR with such patch just because the use case is extremely specific? Sure one can just fork C* but it's not the big deal. 

Anyway, I am happy to contribute. 
BTW, I solved my issues with IN filtering using joinWithCassandraTable and the boost is amazing (say I have 1000 cores cluster and without the hack I need to wait forever until all C* partitions arrive to the only one Spark partition, and then repartition is extremely expensive in Spark), but it's a hack. The idiomatic way of using Spark collections is to distribute things as even as possible and I am 100% sure that it should be absolutely transparently for the end user and such fine grained distribution that additional C* partition stats enables would be killer feature (we spent few days in our team and figured out that such automatic partitioning is almost unachievable in Hbase so dropped it) . 

8 Мар 2017 г. 12:30 AM пользователь "Russell Spitzer" <rus...@datastax.com> написал:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
...

Russell Spitzer

unread,
Mar 7, 2017, 3:01:15 PM3/7/17
to spark-conn...@lists.datastax.com
Did you try with the union setup I suggested? That's basically what we would want to happen automatically.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Artur R

unread,
Mar 7, 2017, 3:08:48 PM3/7/17
to spark-conn...@lists.datastax.com
To be honest I didn't understand why do we need union :) 

I just sc.parallelize(desiredPartitionKeys, desiredParallelism).map(_ -> "" // here just creating key value pairs, looks like joinWithCassandraTable doesn't work otherwise).joinWithCassandraTable(..)
 and that's -  I get roughly evenly distributed RDD (sure if keys are skewed then not so evenly) and then convert it to Dataframe. 

8 Мар 2017 г. 1:01 AM пользователь "Russell Spitzer" <russell...@gmail.com> написал:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
...

Russell Spitzer

unread,
Mar 7, 2017, 3:16:14 PM3/7/17
to spark-conn...@lists.datastax.com
The union is what you could do to subdivide very long partitions, Basically you would have multiple jWCT, each selecting
a different portion of each long C* partition.

IE

jwct.where(first hour) ++ jwct.where(second hour) ++ ...... +++ jwct.wheter(tenthhour)

This would give us 10 Spark partitions for each C* partition, so you could cut up very very large partitions this way.
Reply all
Reply to author
Forward
0 new messages