CassandraTable: restrict returned data set by token range query?

530 views
Skip to first unread message

Stephan Kepser

unread,
Nov 17, 2016, 11:34:05 AM11/17/16
to DataStax Spark Connector for Apache Cassandra
Hi there,

using the Spark Cassandra connector I'd like to retrieve a large table from Cassandra. Actually all rows are needed and processed, thus I use cassandraTable(). It turns out that sometimes the table to fetch is that big that the resulting RDD is too huge to process further. (We need to do a reduceByKey on the RDD). So, the idea is to process the table in batches. The primary key is not of help here, because it does not contain a cluster key component for a where-query.

In CQL, a way to divide a huge table into batches is to use token range queries like

select * from my_table where token(primary_key) > N and token(primary_key) < M.

As far as I know, the Spark Cassandra connector uses token range queries under the hood to distribute the RDD into partitions and ensure data locality. But it seems I can't do
cassandraTable("my keyspace", "my_table").where("token(primary_key) > K").limit 10000

Is there a way to do some kind of token range restricted queries using cassandraTable() - without using cluster keys in a where() part.
Or is there any alternative strategy to divide a huge table into batches and load the batches one after another into RDDs?
Any hints would be very much appreciated.
Thanks a lot,

Stephan

Russell Spitzer

unread,
Nov 17, 2016, 11:42:42 AM11/17/16
to DataStax Spark Connector for Apache Cassandra
Basically your description is exactly how the connector works. Too get smaller breaks you simply need to increase the number of partitions. 

There are two ways to do this
In the ReadConf object you can directly choose the number of splits see the numSplits parameter. 
The other is using the split_size_in_mb parameter which lets you increase the number of partitions by decreasing this parameter since it's just doing a division of Size / SplitSize_In_Mb = partitions.

Both of these methods can be used to limit the number of partitions. Within a token range there is currently no way to brake up token ranges more finely than this. If you need to break up within partitions you'll need to use some custom Cql and the CassandraConnector interface.

Also it may be helpful to note that most ReduceByKey usages will be much faster if you use 
.keyBy in your cassandraTable call (as long as the key is a partition key/primary key). See 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md#keying-a-table-with-the-partition-key-produces-an-rdd-with-a-partitioner

Or you may just be looking for a spanByKey approach see
https://github.com/datastax/spark-cassandra-connector/blob/75719dfe0e175b3e0bb1c06127ad4e6930c73ece/doc/3_selection.md#grouping-rows-by-partition-key


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Ashic Mahtab

unread,
Nov 17, 2016, 11:44:35 AM11/17/16
to DataStax Spark Connector for Apache Cassandra

You should be able to increase the number of partitions. 


Another approach would be to query just the partition keys, batch them up and process, or repartition those ids, and doing foreachPartition.


From: spark-conn...@lists.datastax.com <spark-conn...@lists.datastax.com> on behalf of Stephan Kepser <stephan...@codecentric.de>
Sent: 17 November 2016 16:34:05
To: DataStax Spark Connector for Apache Cassandra
Subject: CassandraTable: restrict returned data set by token range query?
 

Jim Hatcher

unread,
Nov 17, 2016, 11:45:08 AM11/17/16
to spark-conn...@lists.datastax.com

Hi Stephan,


If you're not able to change the partition size (as Russell describes) below and you choose to go down the road of breaking it down further via CQL, a few things that jump to mind for me are:

  • You could put a C* secondary index on a field by which you wanted to batch.  C* secondary indexes are discouraged for use in real-time apps (because they require scanning the whole cluster), but they're fine to use in Spark jobs where you're planning on scanning the whole cluster (or data center) anyway.
  • If you have a well-distributed integer column in your table, you can break into batches by doing a simple modulo.  For instance if you wanted to break the batch into 10 batches, you could do cassandraTable("my keyspace", "my_table").where("token(my_int_field) % 10 = 1")
Jim




From: spark-conn...@lists.datastax.com <spark-conn...@lists.datastax.com> on behalf of Russell Spitzer <rus...@datastax.com>
Sent: Thursday, November 17, 2016 10:42 AM
To: DataStax Spark Connector for Apache Cassandra
Subject: Re: CassandraTable: restrict returned data set by token range query?
 
Reply all
Reply to author
Forward
0 new messages