Improve read performance using the connector - GC tuning but not only

1,373 views
Skip to first unread message

Davide Mandrini

unread,
Dec 6, 2017, 5:15:07 AM12/6/17
to DataStax Spark Connector for Apache Cassandra
Hello,

I am writing here to have some advices on how to improve read performance in a analytics job using the spark-cassandra connector.
I am facing are some timeout issues while performing lots of reads on several partition keys.

In cassandra logs such timeouts are something like:

WARN [ScheduledTasks:1] 2017-12-06 04:31:49,752 MonitoringTask.java:150 - 32 operations timed out in the last 20394 msecs, operation list available at debug log level
DEBUG [ScheduledTasks:1] 2017-12-06 04:31:49,753 MonitoringTask.java:155 - 32 operations timed out in the last 20394 msecs:
SELECT * FROM xyz WHERE ts = 2017-11-10 00:32+1100 LIMIT 5000: total time 15563 msec - timeout 5000 msec
SELECT * FROM xyz WHERE ts = 2017-10-20 00:31+1100 LIMIT 5000: total time 15597 msec - timeout 5000 msec
SELECT * FROM xyz WHERE ts = 2017-10-20 00:49+1100 LIMIT 5000: total time 15596 msec - timeout 5000 msec

The cluster
-----------
I am running an analytic Spark job in a cluster with this configuration:

- DC1: 2 nodes, both with 32 cores and 64GB of RAM
- DC2: 2 nodes with 32 and 24 cores, both with 64GB of RAM

On all nodes I run Cassandra and Spark.
As spark jobs, I have a streaming job and the the analytic job (running once a day).
The replication factor is 2, one per DC. This is configured to have one copy of each data per DC.
All nodes have C* 3.7 and Spark 2.1.1, connector version 2.0.5.

The spark streaming job is in charge of the writes in the cluster (it receives data from Flume agent). This means that my cluster have a heavy write workload throughout all the day, and a heavy read workload once a day.

The table read
--------------
The table I am reading data from has this schema:

CREATE TABLE xyz (
ts timestamp,
f1 text,
f2 text,
count counter,
PRIMARY KEY ((ts), f1, f2)
) WITH CLUSTERING ORDER BY (f1 ASC, f2 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';

As you can see, the partition key is ts, where each possible value represent a particular minute in time (for example, 2017-12-05 17:42+0000)

The analytics job
-----------------
I run the analytic job with 2 executors per node, 4 cores per executor, 5GB per executor (32 cores and 40GB of memory in total).

The analytical job is in charge of reading data from one C* table and write to another C* table (with a different partition key).

The analytic job reads 12 weeks of data in 24 batches reading each 5040 minutes, thus accessing 5040 partition keys.
For each batch, I prepare a RDD with the 5040 keys, and then I use `repartitionByCassandraReplica` with `joinWithCassandraTable` to ensure data locality.

From `nodetool tablestats` and `nodetool tablehistograms` I can see that a partition size is, on average, 422KB. This means that each batch need to read 5040*422KB ~= 2GB of (uncompressed?) data.

The jobs connects to the master node, as I do not specify the host in the C* connector.
For this reason, by default, data will be read only from DC1.

The problem
-----------
With the default Cassandra Heap space size of 8GB, the spark job fails due to many read timeouts errors in C*.
I was able to correlate these errors with big GC pauses due to FullGC.

I investigated a lot, and I was able to exclude peaks in disk access (with iostat command).
Tracing the same kind of queries in CQLSH I was able to see a reasonable response time (10-50ms) for more than a thousand request done at the same time.

The sole issue seems to be the Garbage Collector.

I tried to increase MAX_HEAP_MEMORY from 8 to 12GB, and then from 12GB to 16GB. It helped but still see some timeouts:

- With 8GB, as mentioned, the job stopped as there were too many timeouts that cause spark errors
- With 12GB the job completed in 48 minutes, with around 15-20 timeouts due to GC visible in Cassandra log only (no impact on spark job)
- With 16GB the job completed in 41 minutes, **still with** some timeouts due to GC (around 10 cases)

**First question:** would it be useful to override the connector parameter `spark.cassandra.connection.factory` to use a policy that will use all the nodes from both DC considering that my latency between DC is low (< 3ms)?

This will put less pressure on the nodes on DC1, as the reads will be spread on 4 nodes instead of 2.

**Second question:** considering the way the data is written on the table, I might decide to change the compaction strategy to `DateTieredCompactionStrategy`. Would it be useful to change the compaction strategy?

This is a costly operation in a production cluster, so I would like to be reasonably sure that it will be beneficial.
For information, I can see that the number of SSTables accessed on different nodes is between 5 and 12 for the 50% percentile, 7 and 14 for 99% and 7-14 again as max value (from `nodetool tablehistograms`)

**Third question:** is there another optimisation point I am missing here?

Russell Spitzer

unread,
Dec 6, 2017, 10:21:29 AM12/6/17
to spark-conn...@lists.datastax.com
You probably just need a larger heap on 2.0.6.

Or you can try running with SPARKC-507 in SPARKC 2.0.6 which changes the allocation pattern for JoinWithCassandraTable.
https://github.com/datastax/spark-cassandra-connector/pull/1145/commits/885c78627c275d0f8b7719d4863131da88472bd8

1) Probably not it sounds like your issue is GC related. If you do actually have server side timeouts you just need to increase that threshold or possibly check for tombstones.

2)Probably not in this case? 
3) Yes, see 2.0.6 you may also want to try without the "repartition" call just to see if that's a bit more efficient. But the 2.0.6 improvement is key

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer
Software Engineer




DS_Sig2.png

Russell Spitzer

unread,
Dec 6, 2017, 10:21:47 AM12/6/17
to spark-conn...@lists.datastax.com
larger heap on 2.0.5 :( mistype

Davide Mandrini

unread,
Dec 6, 2017, 12:08:17 PM12/6/17
to DataStax Spark Connector for Apache Cassandra
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer

Thank you Russell for your quick reply!

About question 2, I finally update the compaction strategy and it did not improve the situation.

I will try to use the new version of the connector and try to play with the repartitionByCassandraReplica to see what happens.

Cheers,
Davide

Davide Mandrini

unread,
Dec 8, 2017, 11:03:14 AM12/8/17
to DataStax Spark Connector for Apache Cassandra
Hi Russell,

I tried with the new connector version (keeping the heap at 16GB), and the long GC pauses disappeared.
However, the job is starting with the timeouts even earlier.

In spark logs I can see:

17/12/09 02:31:14 WARN TaskSetManager: Lost task 6.0 in stage 1.0 (TID 15, <master_node_ip>, executor 0): java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.OperationTimedOutException: [<master_node_ip>:9042] Timed out waiting for server response
at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$$anonfun$slidingPrefetchIterator$3.apply(AbstractCassandraJoin.scala:209)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$$anonfun$slidingPrefetchIterator$3.apply(AbstractCassandraJoin.scala:209)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:80)
at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:77)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.OperationTimedOutException: [<master_node_ip>:9042] Timed out waiting for server response
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onTimeout(RequestHandler.java:772)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:1374)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:588)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:662)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:385)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more

Few lines before, still in spark logs, I can see how long it took to complete the task on one executor for each of the 20 partitions (10 partitions per host by default in the connector times the 2 nodes in the DC I am connecting to):
17/12/09 02:28:21 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID 18) in 86030 ms on <slave_node_ip> (executor 5) (1/20)
17/12/09 02:28:33 INFO TaskSetManager: Finished task 9.0 in stage 1.0 (TID 19) in 97932 ms on <slave_node_ip> (executor 6) (2/20)
17/12/09 02:28:34 INFO TaskSetManager: Finished task 18.0 in stage 1.0 (TID 20) in 99095 ms on <slave_node_ip> (executor 4) (3/20)
17/12/09 02:28:46 INFO TaskSetManager: Finished task 19.0 in stage 1.0 (TID 21) in 110752 ms on <slave_node_ip> (executor 7) (4/20)
17/12/09 02:29:10 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 5) in 138120 ms on <master_node_ip> (executor 1) (5/20)
17/12/09 02:29:11 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 9) in 139224 ms on <master_node_ip> (executor 1) (6/20)
17/12/09 02:29:12 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 17) in 140452 ms on <master_node_ip> (executor 1) (7/20)
17/12/09 02:29:16 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 13) in 143884 ms on <master_node_ip> (executor 1) (8/20)
17/12/09 02:29:42 INFO TaskSetManager: Finished task 15.0 in stage 1.0 (TID 12) in 170391 ms on <master_node_ip> (executor 3) (9/20)
17/12/09 02:29:46 INFO TaskSetManager: Finished task 17.0 in stage 1.0 (TID 16) in 174510 ms on <master_node_ip> (executor 3) (10/20)
17/12/09 02:29:47 INFO TaskSetManager: Finished task 11.0 in stage 1.0 (TID 4) in 175380 ms on <master_node_ip> (executor 3) (11/20)
17/12/09 02:29:48 INFO TaskSetManager: Finished task 13.0 in stage 1.0 (TID 8) in 176357 ms on <master_node_ip> (executor 3) (12/20)

In cassandra logs I can see a lot of
DEBUG [SharedPool-Worker-4] 2017-12-09 02:20:33,417 ReadCallback.java:126 - Timed out; received 0 of 1 responses
DEBUG [SharedPool-Worker-8] 2017-12-09 02:20:33,422 ReadCallback.java:126 - Timed out; received 0 of 1 responses
...

I have tried to run the job with spark.cassandra.concurrent.reads equals to 512 (the default), 1024 and even 5080 (my number of partitions to read). This did not improve the situation.

What are the parameters I can play with in order to solve the issue?

How is it possible that the new connector version solved the GC pauses but it is introducing early timeouts?

Regards,
Davide



Davide Mandrini

unread,
Dec 8, 2017, 11:18:25 AM12/8/17
to DataStax Spark Connector for Apache Cassandra
Forget the cassandra logs I put, they were from a previous execution.
Nothing relevant in cassandra logs during that execution.

Russell Spitzer

unread,
Dec 8, 2017, 11:32:03 AM12/8/17
to spark-conn...@lists.datastax.com
The upgrade fixed the way queries where being queued. It's probably not timing out earlier, it's probably timing out at similar amount of work but that work is completed faster because there are no GC issues on the executor. You next step is to *Lower* parallelism not increase it since you may be overwhelming your C* node? Make sure you have clean logs so you can see what's going on.

Davide Mandrini

unread,
Dec 12, 2017, 5:14:40 AM12/12/17
to DataStax Spark Connector for Apache Cassandra
Hello Russell,

what exactly spark.cassandra.concurrent.reads represents?

Let's say I have created an RDD with 1000 keys, keeping the default value of 10 for repartitionByCassandraReplica parameter partitionsPerHost.
If the DC I am connecting to have 2 nodes, the keys RDD will have 20 partitions, each partition "responsible" of 50 keys.

Let's suppose I specify for spark.cassandra.concurrent.reads = 15:
- does this mean that I will read 15 partitions in parallel, i.e. 750 keys in a single request?
- does this mean that I will read 15 keys in parallel in each partitions, i.e. 15 keys in a single request?
I suppose the correct answer is the second one, as Spark works per partition.

In this case we will need 4 requests (the first three with 15 keys, and the last one with 5 keys) to complete the processing of a partition. Am I understanding this parameter correctly?

Thank you,
Davide

Russell Spitzer

unread,
Dec 12, 2017, 1:15:27 PM12/12/17
to spark-conn...@lists.datastax.com
... I am not sure what you are saying

Concurrent reads is the number of concurrent queries that JWCT will run per executor core.

Concurrent reads set to 4 means in a 4 core spark executor means, 16 requests will run MAX at the same time. Each of these requests gets paged through in order though. So we never return records out of partition order.

So the conntector runs

Select * from TABLE where PK = value1
Select * from TABLE where PK = value2
Select * from TABLE where PK = value3
Select * from TABLE where PK = value4

then waits for at least value 1 to complete before buffering another set of requests. 

Davide Mandrini

unread,
Dec 19, 2017, 11:05:27 AM12/19/17
to DataStax Spark Connector for Apache Cassandra
Thanks, this clarify the parameter.
I still don't get a last point then.

My cassandra node is configured with concurrent_reads: 32
In my job I use 2 executors per node, with 4 cores each (8 cores used in a node).
Then I set concurrent.reads to 4. This means that in a specific moment in time I expect to have 32 reads coming from the connector.

In this configuration I am still able to see in Cassandra logs some timeouts:

INFO [ScheduledTasks:1] 2017-12-20 02:26:34,702 MessagingService.java:1035 - READ messages were dropped in last 5000 ms: 106 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 16172 ms and Mean cross-node dropped latency: 0 ms
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,703 MessagingService.java:1035 - REQUEST_RESPONSE messages were dropped in last 5000 ms: 59 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 17166 ms and Mean cross-node dropped latency: 0 ms
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,703 MessagingService.java:1035 - COUNTER_MUTATION messages were dropped in last 5000 ms: 9 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 15527 ms and Mean cross-node dropped latency: 0 ms
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,703 StatusLogger.java:52 - Pool Name Active Pending Completed Blocked All Time Blocked
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,710 StatusLogger.java:56 - MutationStage 0 0 12556449 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,711 StatusLogger.java:56 - ViewMutationStage 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,711 StatusLogger.java:56 - ReadStage 0 0 828656 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,711 StatusLogger.java:56 - RequestResponseStage 0 0 318912944 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,711 StatusLogger.java:56 - ReadRepairStage 0 0 1 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,711 StatusLogger.java:56 - CounterMutationStage 0 0 205166223 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,715 StatusLogger.java:56 - MiscStage 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,715 StatusLogger.java:56 - CompactionExecutor 0 0 774043 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,715 StatusLogger.java:56 - MemtableReclaimMemory 0 0 687 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,716 StatusLogger.java:56 - PendingRangeCalculator 0 0 5 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,716 StatusLogger.java:56 - GossipStage 0 0 2794567 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,719 StatusLogger.java:56 - SecondaryIndexManagement 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,719 StatusLogger.java:56 - HintsDispatcher 0 0 176 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,720 StatusLogger.java:56 - MigrationStage 0 0 4558 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,720 StatusLogger.java:56 - MemtablePostFlush 0 0 826 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,720 StatusLogger.java:56 - PerDiskMemtableFlushWriter_0 0 0 687 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,720 StatusLogger.java:56 - ValidationExecutor 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,721 StatusLogger.java:56 - Sampler 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,721 StatusLogger.java:56 - MemtableFlushWriter 0 0 687 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,721 StatusLogger.java:56 - InternalResponseStage 0 0 71883 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,721 StatusLogger.java:56 - AntiEntropyStage 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:56 - CacheCleanupExecutor 0 0 0 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:56 - Native-Transport-Requests 1 0 371637085 0 146675
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:66 - CompactionManager 0 0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:78 - MessagingService n/a 0/0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:88 - Cache Type Size Capacity KeysToSave
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:90 - KeyCache 104854608 104857600 all
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,722 StatusLogger.java:96 - RowCache 0 0 all
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:103 - Table Memtable ops,data
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system_distributed.parent_repair_history 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system_distributed.repair_history 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system_distributed.view_build_status 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system.compaction_history 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system.hints 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system.schema_aggregates 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,723 StatusLogger.java:106 - system.IndexInfo 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.schema_columnfamilies 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.schema_triggers 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.size_estimates 76545,1026131
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.schema_functions 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.paxos 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.views_builds_in_progress 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.built_views 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.peer_events 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.range_xfers 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,724 StatusLogger.java:106 - system.peers 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.batches 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.schema_keyspaces 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.schema_usertypes 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.local 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.sstable_activity 4400,37740
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,725 StatusLogger.java:106 - system.available_ranges 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,739 StatusLogger.java:106 - system.batchlog 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,739 StatusLogger.java:106 - system.schema_columns 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,739 StatusLogger.java:106 - system_schema.columns 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.types 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.indexes 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.keyspaces 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.dropped_columns 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.aggregates 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.triggers 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.tables 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.views 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,740 StatusLogger.java:106 - system_schema.functions 0,0
[my keyspace table here]
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_auth.roles 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_auth.role_members 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_auth.resource_role_permissons_index 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_auth.role_permissions 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_traces.sessions 0,0
INFO [ScheduledTasks:1] 2017-12-20 02:26:34,750 StatusLogger.java:106 - system_traces.events 0,0
WARN [ScheduledTasks:1] 2017-12-20 02:26:34,751 MonitoringTask.java:150 - 23 operations timed out in the last 19979 msecs, operation list available at debug log level
DEBUG [ScheduledTasks:1] 2017-12-20 02:26:34,751 MonitoringTask.java:155 - 23 operations timed out in the last 19979 msecs:
SELECT * FROM keyspace.table WHERE ts = 2017-11-30 03:34+1100 LIMIT 5000: total time 15127 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-05 03:14+1100 LIMIT 5000: total time 15147 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-22 03:53+1100 LIMIT 5000: total time 15128 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-10 03:12+1100 LIMIT 5000: total time 15136 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-05 03:02+1100 LIMIT 5000: total time 15137 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-30 03:04+1100 LIMIT 5000: total time 15147 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-03 03:53+1100 LIMIT 5000: total time 15157 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-01 03:14+1100 LIMIT 5000: total time 15147 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-19 03:23+1100 LIMIT 5000: total time 15127 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-03 03:45+1100 LIMIT 5000: total time 15125 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-05 03:58+1100 LIMIT 5000: total time 15130 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-23 03:54+1100 LIMIT 5000: total time 15146 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-04 03:21+1100 LIMIT 5000: total time 15134 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-08 03:59+1100 LIMIT 5000: total time 15158 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-28 03:59+1100 LIMIT 5000: total time 15124 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-09-28 02:47+1000 LIMIT 5000: total time 15142 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-11-04 03:02+1100 LIMIT 5000: total time 15125 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-28 03:43+1100 LIMIT 5000: total time 15153 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-17 03:31+1100 LIMIT 5000: total time 15172 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-13 03:31+1100 LIMIT 5000: total time 15127 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-17 03:31+1100 LIMIT 5000: total time 15172 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-13 03:31+1100 LIMIT 5000: total time 15127 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-17 03:40+1100 LIMIT 5000: total time 15128 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-12-09 03:32+1100 LIMIT 5000: total time 15129 msec - timeout 5000 msec
SELECT * FROM keyspace.table WHERE ts = 2017-10-12 03:59+1100 LIMIT 5000: total time 15172 msec - timeout 5000 msec

(each partition, i.e. a minute, is 422KB on average)

The timeout we see here of 5000msec is the read_request_timeout_in_ms: 5000 in cassandra.yaml.
In my job I am using spark.cassandra.read.timeout_ms: 240000 (2min)

1) What could it be the reason for such timeouts (15 seconds!), considering that I am doing a number of parallel reads that matches exactly the cassandra configuration?

2) What is the meaning of spark.cassandra.read.timeout_ms? Is it somehow related to read_request_timeout_in_ms in cassandra config?

Thank you,
Davide

Davide Mandrini

unread,
Dec 19, 2017, 11:26:07 AM12/19/17
to DataStax Spark Connector for Apache Cassandra
About question number 2, I found the reply by looking at the code of the spark-cassandra-connector and the doc of the Java connector here:
http://docs.datastax.com/en/developer/java-driver/3.1/manual/socket_options/#driver-read-timeout

Russell Spitzer

unread,
Dec 19, 2017, 11:30:09 AM12/19/17
to spark-conn...@lists.datastax.com
15 seconds latency means that something is maybe not as it seems. Most likely some of the partitions are much much larger than 422kb or have some tombstone problems. Since you have the operations that timed out you can check those. You may also see that C* itself is in GC which may be an issue. 
Message has been deleted

Davide Mandrini

unread,
Dec 20, 2017, 3:22:53 AM12/20/17
to DataStax Spark Connector for Apache Cassandra
Il giorno martedì 19 dicembre 2017 17:30:09 UTC+1, Russell Spitzer ha scritto:
> 15 seconds latency means that something is maybe not as it seems. Most likely some of the partitions are much much larger than 422kb or have some tombstone problems. Since you have the operations that timed out you can check those. You may also see that C* itself is in GC which may be an issue. 
>
>

Just to close this post, the issue seems to be the GC indeed (the partition size was OK, with a max value of 1.3MB, and almost no tombstone, as we do not delete any data in our process).

Thank you Russell for all your replies!
Davide
Reply all
Reply to author
Forward
0 new messages