We have one environment with 8 nodes each running Spark and Cassandra nodes.
There our code that reads from cassandra using data locality work OK.
The code snipped is this:
//DataRecord heere is our case class store in cassandra table
//anchorList is a list of DataRecord(s) with partition key
val anchorRdd = sc.parallelize(anchorList).repartitionByCassandraReplica(keyspace,table);
//joinData is actual data with get from cassandra
val joinData = anchorRdd.joinWithCassandraTable[DataRecord](keyspace,dataTable, selectColumns, joinColumns)
Recently we started to migrate to another environment.
Where Spark and Cassandra are not collocated. Cassnadra cluster running on its own cluster, and Spark running on its own. And spark has more nodes.
Should we somehow amend our code in this case? Does read with datalocality should still work?
Without doing any changes, we are running into next issue:
[2016-10-28 07:58:56,555] WARN k.scheduler.TaskSetManager [] [akka://JobServer/user/context-supervisor/mpda] - Lost task 32.0 in stage 1.0 (TID 34, ip-10-188-134-141.novartis.net): java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [ip-10-188-134-15.novartis.net/10.188.134.15:9042] Timed out waiting for server response
at com.google.common.mpda.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at com.google.common.mpda.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at com.google.common.mpda.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at com.datastax.spark.connector.rdd.CassandraJoinRDD$$anonfun$fetchIterator$1.apply(CassandraJoinRDD.scala:268)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.OperationTimedOutException: [ip-10-188-134-15.novartis.net/10.188.134.15:9042] Timed out waiting for server response
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onTimeout(RequestHandler.java:766)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:1267)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:655)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
... 1 more
Appreciate any ideas!
Increasing read timeout does not help.
It is 120 sec by default, we made it 240 sec. Same effect.
And there is not that much data actually.
input.join.throughput_query_per_sec |
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I tried to set
input.join.throughput_query_per_sec=1000
but it did not help. The default values is very big though.
And it is not 100% clear to me from documentation what this parameter
do.
I will try to reduce it later and try again.
But, setting
spark.cassandra.input.fetch.size_in_rows=1000
helped.
I have a general question.
In case of collocated nodes, we have data locality.
In case of not collocated, we don`t have data locality, that is clear to me.
But, even without data locality, I would like to read data in distributed manner. So that every node reads some portion of data, maybe not very efficient, but still without duplication.
I tried sc.cassandraTable(...).select(...).where(...)
but it appears to generate a single partition read operation. Means only one spark node read the whole dataset, which is to much in my case for one node.
Does driver still read in distributed manner if I use
repartitionByCassandraReplica
joinWithCassandraTable
approaches?
Or, is it completely not relevant to use these calls in not collocated environment?
What would be recommended approach to work with in this case?
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
By default all `casandraTable` calls are distributed. The only lose that property on single C* partitions. This means the "where" clause in your request only applies to a single C* partition. The Connector has no way of breaking up a C* partition so a single C* partition will become a single Spark partition.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.
Consistency level will control the number of replicas checked, but multi replicas will not be used in concert to read a partition more quickly.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--### Confidential e-mail, for recipient's (or recipients') eyes only, not for distribution. ###
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.