Data Locality read on not collocated Cassandra and Spark nodes

450 views
Skip to first unread message

Opentaps onJava

unread,
Oct 28, 2016, 8:16:59 AM10/28/16
to DataStax Spark Connector for Apache Cassandra
Hello everyone,

We have one environment with 8 nodes each running Spark and Cassandra nodes.
There our code that reads from cassandra using data locality work OK.
The code snipped is this:
//DataRecord heere is our case class store in cassandra table
//anchorList is a list of DataRecord(s) with partition key
val anchorRdd = sc.parallelize(anchorList).repartitionByCassandraReplica(keyspace,table);
//joinData is actual data with get from cassandra
val joinData = anchorRdd.joinWithCassandraTable[DataRecord](keyspace,dataTable, selectColumns, joinColumns)


Recently we started to migrate to another environment.
Where Spark and Cassandra are not collocated. Cassnadra cluster running on its own cluster, and Spark running on its own. And spark has more nodes.
Should we somehow amend our code in this case? Does read with datalocality should still work?

Without doing any changes, we are running into next issue:
[2016-10-28 07:58:56,555] WARN k.scheduler.TaskSetManager [] [akka://JobServer/user/context-supervisor/mpda] - Lost task 32.0 in stage 1.0 (TID 34, ip-10-188-134-141.novartis.net): java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [ip-10-188-134-15.novartis.net/10.188.134.15:9042] Timed out waiting for server response
at com.google.common.mpda.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at com.google.common.mpda.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at com.google.common.mpda.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.OperationTimedOutException: [ip-10-188-134-15.novartis.net/10.188.134.15:9042] Timed out waiting for server response
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onTimeout(RequestHandler.java:766)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:1267)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:655)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
... 1 more


Appreciate any ideas!

Opentaps onJava

unread,
Oct 28, 2016, 8:59:50 AM10/28/16
to DataStax Spark Connector for Apache Cassandra
And what is interesting also, that sometimes the same request on the same data works OK. But sometimes is start to fail.
Looks like at some point, doing data locality read stop working.
So far we were not able to track down the root cuse.

Increasing read timeout does not help.
It is 120 sec by default, we made it 240 sec. Same effect.
And there is not that much data actually.

Opentaps onJava

unread,
Oct 28, 2016, 9:01:58 AM10/28/16
to DataStax Spark Connector for Apache Cassandra
Forgot to mention, we are on:
* Spark 1.6.1
* Spark cassandra connector 3.1.0
* Cassandra 3.0.8

Russell Spitzer

unread,
Oct 28, 2016, 12:06:33 PM10/28/16
to DataStax Spark Connector for Apache Cassandra
There is no data locality if the processes aren't colocated. Most likely you are just overloading the cluster. There seems to be a pretty prevalent belief that you can somehow increase throughput by increasing the number of spark nodes. This is simply not true. The reading bottleneck is almost always Cassandra.

The best thing you can do in your use case is slow down the read rate. See 
input.join.throughput_query_per_sec
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Opentaps onJava

unread,
Oct 31, 2016, 1:21:07 PM10/31/16
to DataStax Spark Connector for Apache Cassandra
Hi Russel,
Thank you for suggestion to slow down the read rate!

I tried to set
input.join.throughput_query_per_sec=1000
but it did not help. The default values is very big though.
And it is not 100% clear to me from documentation what this parameter
do.
I will try to reduce it later and try again.


But, setting
spark.cassandra.input.fetch.size_in_rows=1000
helped.

I have a general question.
In case of collocated nodes, we have data locality.
In case of not collocated, we don`t have data locality, that is clear to me.
But, even without data locality, I would like to read data in distributed manner. So that every node reads some portion of data, maybe not very efficient, but still without duplication.

I tried sc.cassandraTable(...).select(...).where(...)
but it appears to generate a single partition read operation. Means only one spark node read the whole dataset, which is to much in my case for one node.

Does driver still read in distributed manner if I use
repartitionByCassandraReplica
joinWithCassandraTable
approaches?
Or, is it completely not relevant to use these calls in not collocated environment?
What would be recommended approach to work with in this case?


Russell Spitzer

unread,
Oct 31, 2016, 1:30:29 PM10/31/16
to DataStax Spark Connector for Apache Cassandra
By default all `casandraTable` calls are distributed. The only lose that property on single C* partitions. This means the "where" clause in your request only applies to a single C* partition. The Connector has no way of breaking up a C* partition so a single C* partition will become a single Spark partition. 

The other methods are completely different applications. rBCR will change the number of partitions in a generic RDD based on the number of C* nodes. jWCT Takes the partitioning of the source RDD and uses that to distribute queries with one query per Row in the source.

Here is a brief use case guide:

`cassandraTable` : When you need to perform a full table scan of C*
`repartitionByCassandraReplica`: Used to align an RDD of Key values with a Cassandra Cluster
`joinWithCassandraTable`: Used to select a subset of keys from Cassandra like a SQL `IN` clause

kant kodali

unread,
Dec 3, 2016, 3:30:41 AM12/3/16
to spark-conn...@lists.datastax.com
@Russell With AWS EBS-Network backed SSD storage Disk throughput is slightly below or equal to network throughput (m4.xlarge). In this case, I don't see how colocation can help? And with AWS instance store we can't persist since it ephemeral and I am assuming cassandra needs to be on a storage where we can persist so I don't know if colocation makes sense there either? Please clarify!

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

David Mitchell

unread,
Dec 3, 2016, 8:23:53 AM12/3/16
to spark-conn...@lists.datastax.com
On Mon, Oct 31, 2016 at 1:30 PM, Russell Spitzer <russell...@gmail.com> wrote:
By default all `casandraTable` calls are distributed. The only lose that property on single C* partitions. This means the "where" clause in your request only applies to a single C* partition. The Connector has no way of breaking up a C* partition so a single C* partition will become a single Spark partition. 


Since a single Spark partition is processed by a single Spark task, wouldn't a fat C* partition (with millions of rows) be broken up in chunks of data based on  spark.cassandra.input.split.size?  So, to clarify what you are saying above, a single fat C* partition (with millions of rows) would be processed on a single Spark worker node, and the Spark worker would use many tasks to process the single fat C* partition.  Is this correct?

If my understanding is correct, namely, that a Spark worker would use many tasks to process the single fat C* partition, logically, it should be possible to distribute a single fat C* partition across Spark worker nodes.  However, I understand that the spark cassandra connector processes a single C* partition on a single Spark worker node.

Russell Spitzer

unread,
Dec 3, 2016, 11:04:42 AM12/3/16
to spark-conn...@lists.datastax.com
A single C* partition is processed by a single Spark Task. All those millions of rows within a C* partition cannot be split up. That means 1 executor core deals with 1 task which deals with 1 Cassandra partition.

All input split configs change the way the full TokenRange is broken up so they only move specific complete partitions.

We currently have no way to automatically distribute a fat C* partition across Spark Partitions nodes (doing so would also breaks a few other features) for a few reasons.

Consider the Following

Spark Partition contents (or bounds) have be established BEFORE any data has been read from Cassandra. Once the RDD calls getPartitions, we need to have all the metadata that task will require. 

So let's say I have a Fat Cassandra Partition which has the clustering key Int
(key Int, cluster Int, value Int, PRIMARY KEY (key, cluster) 

The range of integers which could possibly exist is Integer.MinValue to Integer.MaxValue but the distribution of values within the Cassandra Partition is unknown. So how do you break up this range? 

We could break it into equal portions based on the full range of data, range/ num partitions, but what happens if the Cassandra partition has 100000 values that are all contigous 1-100000? or are in some other way not randomly distributed over the range? We basically just end up making a bunch of empty tasks and one task that has all the data.

Basically what i'm trying to show is that without foreknowledge of the underlying distribution of the data within the partition there is no way to break it up into chunks. 

A user though, who does know how his data is distributed could manually accomplish this by doing something like. 

sc.parallelize(Seq(clustering_ranges))
  .mapPartitions(cc.withSessionDo(session => session.execute....))


David Mitchell

unread,
Dec 5, 2016, 11:27:26 AM12/5/16
to spark-conn...@lists.datastax.com
To clarify further, since there is a one to one mapping of C* partition to Spark partition, then even if one is replicating the C* partition three times, only one replica for a single C* partition will be read in a Spark job.

Thank you for your help.



To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.



--
### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###

Russell Spitzer

unread,
Dec 5, 2016, 11:29:11 AM12/5/16
to spark-conn...@lists.datastax.com

Consistency level will control the number of replicas checked, but multi replicas will not be used in concert to read a partition more quickly.


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
Reply all
Reply to author
Forward
0 new messages