Im very new to both Cassandra and Spark and I'm having some problem understanding what is going on under the hood with the connection between these two applications.
I need to query a large ammount of datapoints (20 millions) from a timeseries (timestamp,value) stored in cassandra in order to aggregate them in groups. The problem is that as a beginner I can't synthesize a way do achieve an efficient query to load this data from the examples on github.
My table's description looks like:
CREATE TABLE testing.timeseries (
twid int,
timestamp bigint,
value double,
PRIMARY KEY (twid, timestamp)
)
where timestamp and value are obvious fields and twid is a partition-id with the semantics of "Three-weeks-id", grouping datapoints in spaces of three-weeks in time.
Im interested in all the data on the timeseries table, but I don't really know an efficient way to get them all. I was hoping to issue a query for each twid in parallel, but that idea faded when I tried to get the distinct values of twid via CassandraSQLContext.sql("select distinct twid from timeseries") and this statement actually made spark iterate through all the datapoints (which I expected that Cassandra would do so it could hit the cache or something).
How could I load this big table into Spark in a parallelized fashion so I can get more performance?
Att,
Alexandre Santana
people.registerTempTable("people")--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I have tried tinkering with "input.split.size_in_mb" already, but I had no success. My spark job is being split into several executor and most of them does nothing while having 5 or 6 executor getting times like 59s to execute 26Mb of data mapping.
The important cassandra's configuration are set as:
start_rpc: true
rpc_address: #public ip address
listen_address: #public ip address
ports are default and native protocol is enabled. Im taking care to divide the RAM between them and some extra space to cache these data. There is only a single C* and Spark node and both are co-located.
I can't figure out what is wrong with the application.
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => {
val now = System.currentTimeMillis
session.execute("CREATE KEYSPACE testing WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
session.execute("CREATE TABLE testing.timeseries (timestamp bigint, value bigint, PRIMARY KEY (timestamp));")
for(i <- 0 to 2000000) {
session.execute(s"INSERT INTO testing.timeseries (timestamp, value) VALUES(${now + i}, $i)")
}
})
sqlContext.sql("select * from testing.timeseries").map(row => (row.getLong(1) % 500, row.getLong(0))).reduceByKey((a, b) => a + b).collect()
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Thanks for replying. I use spark version 1.6.1, Cassandra version 2.2.6 and spark-cassandra-connector version 1.6.0 for scala 2.10. The scala version I use to compile the application is 2.10.5.
I couldnt run the last command of the code you showed, the error that I received was:
org.apache.spark.sql.AnalysisException: Table not found: `testing`.`timeseries`;
I then replaced the last line with:
sc.cassandraTable("testing", "timeseries").map(row => (row.getLong(1) % 500, row.getLong(0))).reduceByKey((a, b) => a + b).collect()
The process then took 12s to execute and generated 3 executors. I noticed some strange info in the input size, it was a little too low:
Input Size / Records: 30.5 MB / 2000001
Changing the testing table to my table the same command generated 740 executors and it took 1.4 minutes. The input size was:
Input Size / Records: 307.6 MB / 20161859
This is my table's description:
create table testing.atable (
twid int,
timestamp bigint,
value double,
PRIMARY KEY (twid, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"65536"}'
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
Just changing the tables was enough to make it take minutes instead of seconds.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
implicit val readConf = ReadConf(Some(20))
val rddWithADifferentConf = sc.cassandraTable(...)