Is there a good way to load Cassandra partitions in parallel?

908 views
Skip to first unread message

Alexandre Santana

unread,
Jun 24, 2016, 3:47:59 PM6/24/16
to DataStax Spark Connector for Apache Cassandra
Hello,

Im very new to both Cassandra and Spark and I'm having some problem understanding what is going on under the hood with the connection between these two applications.

I need to query a large ammount of datapoints (20 millions) from a timeseries (timestamp,value) stored in cassandra in order to aggregate them in groups. The problem is that as a beginner I can't synthesize a way do achieve an efficient query to load this data from the examples on github.

My table's description looks like:

CREATE TABLE testing.timeseries (
twid int,
timestamp bigint,
value double,
PRIMARY KEY (twid, timestamp)
)

where timestamp and value are obvious fields and twid is a partition-id with the semantics of "Three-weeks-id", grouping datapoints in spaces of three-weeks in time.

Im interested in all the data on the timeseries table, but I don't really know an efficient way to get them all. I was hoping to issue a query for each twid in parallel, but that idea faded when I tried to get the distinct values of twid via CassandraSQLContext.sql("select distinct twid from timeseries") and this statement actually made spark iterate through all the datapoints (which I expected that Cassandra would do so it could hit the cache or something).

How could I load this big table into Spark in a parallelized fashion so I can get more performance?

Att,
Alexandre Santana

Russell Spitzer

unread,
Jun 30, 2016, 10:43:11 AM6/30/16
to DataStax Spark Connector for Apache Cassandra
Do a select * and then register the resulting table in spark. You could cache this and end up with an in memory representation which may be what you are looking for.
people.registerTempTable("people")

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Alexandre Santana

unread,
Jul 1, 2016, 8:56:52 AM7/1/16
to DataStax Spark Connector for Apache Cassandra
Actually I may have expressed myself poorly. Im having an issue to get my data from Cassandra to Spark. Even though both of them are in a shared memory environment, the time to load 600Mb of data is 80 seconds. At first I thought I was querying the data in a wrong way, but now I suspect that the data is just not being transported right.

I have tried tinkering with "input.split.size_in_mb" already, but I had no success. My spark job is being split into several executor and most of them does nothing while having 5 or 6 executor getting times like 59s to execute 26Mb of data mapping.

Alexandre Santana

unread,
Jul 1, 2016, 7:53:04 PM7/1/16
to DataStax Spark Connector for Apache Cassandra
I took a screenshot to better show the problem. Im queuing the whole table which is 300MB of compressed data, but somehow it takes a lot of time (80s) to gather these data, map it and reduceByKey. The table is as simples as a timeseries (timestamp,value).

The important cassandra's configuration are set as:
start_rpc: true
rpc_address: #public ip address
listen_address: #public ip address

ports are default and native protocol is enabled. Im taking care to divide the RAM between them and some extra space to cache these data. There is only a single C* and Spark node and both are co-located.

I can't figure out what is wrong with the application.

Screenshot from 2016-07-01 18-37-38.png

Jaroslaw Grabowski

unread,
Jul 4, 2016, 5:04:58 AM7/4/16
to spark-conn...@lists.datastax.com
I tried to reproduce on a single machine with following dummy spark shell code

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
val c = CassandraConnector(sc.getConf)

c.withSessionDo ( session => {
val now = System.currentTimeMillis

session.execute("CREATE KEYSPACE testing WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
session.execute("CREATE TABLE testing.timeseries (timestamp bigint, value bigint, PRIMARY KEY (timestamp));")

for(i <- 0 to 2000000) {
session.execute(s"INSERT INTO testing.timeseries (timestamp, value) VALUES(${now + i}, $i)")
}
})

sqlContext.sql("select * from testing.timeseries").map(row => (row.getLong(1) % 500, row.getLong(0))).reduceByKey((a, b) => a + b).collect()

without success. Could you please modify it so that it would reproduce your problem? Also what Cassandra/Spark/Connector versions do you use?


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--

JAROSLAW GRABOWSKI
Software Engineer

Alexandre Santana

unread,
Jul 4, 2016, 1:50:01 PM7/4/16
to DataStax Spark Connector for Apache Cassandra
Hi Jaroslaw,

Thanks for replying. I use spark version 1.6.1, Cassandra version 2.2.6 and spark-cassandra-connector version 1.6.0 for scala 2.10. The scala version I use to compile the application is 2.10.5.

I couldnt run the last command of the code you showed, the error that I received was:
org.apache.spark.sql.AnalysisException: Table not found: `testing`.`timeseries`;

I then replaced the last line with:
sc.cassandraTable("testing", "timeseries").map(row => (row.getLong(1) % 500, row.getLong(0))).reduceByKey((a, b) => a + b).collect()

The process then took 12s to execute and generated 3 executors. I noticed some strange info in the input size, it was a little too low:
Input Size / Records: 30.5 MB / 2000001

Changing the testing table to my table the same command generated 740 executors and it took 1.4 minutes. The input size was:
Input Size / Records: 307.6 MB / 20161859

This is my table's description:

create table testing.atable (


twid int,
timestamp bigint,
value double,
PRIMARY KEY (twid, timestamp)

) WITH CLUSTERING ORDER BY (timestamp DESC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"65536"}'
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';


Just changing the tables was enough to make it take minutes instead of seconds.

Jaroslaw Grabowski

unread,
Jul 4, 2016, 4:13:35 PM7/4/16
to spark-conn...@lists.datastax.com
Ah right, I've inserted only 2m, this should be 20m rows. Nevertheless still no luck reproducing. Could you please run following query for the troublesome table?

SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = 'your keyspace' AND table_name = 'your table';


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Artem Aliev

unread,
Jul 4, 2016, 4:26:59 PM7/4/16
to spark-conn...@lists.datastax.com
My guess is:
Your C* partitions has really different sizes. Some time series with given 'twid' are really big while a lot of them a small or empty.
The connector read a partition key range in one thread into one spark partition. That long running executors read that long timeseries. 
Looks like all 300mb mostly consist of 5 long C* rows.

Alexandre Santana

unread,
Jul 4, 2016, 8:12:05 PM7/4/16
to spark-conn...@lists.datastax.com
Well, Im still clueless also,

the query you asked me to executed yielded the following results:

range_start:  -9223372036854775808
range_end: -9223372036854775808
partitions_count: 640
mean_partition_size:    73833670

Artem, there are 18 distinct twids and they are about the same size. The data is basically measures of a sensor every second in an year and the twid is just an identifier that aggregates the data under a three week interval. Either way, gathering 300mb of data from a local C* node shouldn't take that long. Im starting to believe that there is a misconfiguration in the resources making Spark and Cassandra compete for resources.

Jaroslaw Grabowski

unread,
Jul 5, 2016, 3:11:53 AM7/5/16
to spark-conn...@lists.datastax.com
It looks like size estimates are incorrect, which causes excessive number of tasks.
Could you please use nodetool refreshsizeestimates and verify whether estimated partition size changed and whether this changed the number of tasks?

Artem Aliev

unread,
Jul 5, 2016, 4:48:51 AM7/5/16
to spark-conn...@lists.datastax.com
Most of the time is spent for creating 770 empty  tasks then.
While maximum 18 of them read real data (you have only 18 partitions).
Thus I recommend to decrease number of partitions to ~20 or so.
The easy way to play with number of partitions is to specify it in ReadConf
implicit val readConf = ReadConf(Some(20))
then read it
val rddWithADifferentConf = sc.cassandraTable(...)

And yes, that long rows could cause some performance degradation with C* too, but not much. 

Alexandre Santana

unread,
Jul 5, 2016, 4:08:50 PM7/5/16
to spark-conn...@lists.datastax.com
Jaroslaw, I did that but nothing changed, the estimated sizes continued the same and so did the times.

Artem, I tryied to set the readConf as you suggested but the time to pull the data didn't change.

I guess the problem lies within some kind of misconfiguration in the environment. Is there a way I can check if spark-cassandra-connector knows if both applications are co-located to use the propper communication transport? If that's not the problem then I guess cassandra and spark may be competing for resources.

Russell Spitzer

unread,
Jul 5, 2016, 5:10:52 PM7/5/16
to spark-conn...@lists.datastax.com
I think this is solely an issue with the long rows. I would try increasing the https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters

input.fetch.size_in_rows Parameter

The connector doesn't not currently have a way to pull a single partition in parallel so this may be your only way of increasing the rate at which a large partition is pulled.
--

Russell Spitzer
Software Engineer



Alexandre Santana

unread,
Jul 5, 2016, 5:28:37 PM7/5/16
to spark-conn...@lists.datastax.com
I tried increasing that but with no luck on the results. I will try to restructure the table in a way where it has more partitions.

But after realizing that the transfer of 30mb of data is taking 12s ( with the example code that Jaroslaw sent ) I am inclined to believe that either Cassandra is not realizing that it is co-located with Spark or they are competing for resources. Im sure all the suggestions you gave me are improvements, but even running on simple tables the connector is failing to fetch low ammounts of data from cassandra. There has to be something wrong with the way they share the machine's resources.
Reply all
Reply to author
Forward
0 new messages