Spark cassandra connector slowness as compared to cassandra java driver

16 views
Skip to first unread message

SNEHA GHOSH

unread,
Nov 20, 2024, 9:16:23 AM11/20/24
to DataStax Spark Connector for Apache Cassandra

We are seeing order of magnitude performance difference when comparing the performance for Spark cassandra connector with a cassandra java driver. Need help to achieve similar performance with the connector.


Background:

There are following tables in Cassandra:

table A, primary keys(id, date), clustering key(x, y)

table B, primary keys(year, id), clustering key(date)


Q1(Query for table A): select uid,date,x,y,.. From table A where uid IN (uid1) AND date IN (d1,d2,..d360)


Q2(Query for table B): select id, year,.. From table B where id IN (id1, id2, ..id10K) AND year IN (y1, y2) AND date IN (d1,d2,..d360)


Performance observation:

With Cassandra java driver async API https://docs.datastax.com/en/developer/java-driver/4.13/manual/core/async/index.html , we are seeing Q1 took < 1 sec and Q2 took < 500 ms to fetch the results from Cassandra. We are using Async API and fire parallel queries to cassandra cluster and get the results quickly. The parallel queries were split for each partition value so for Q1 we fired 360 queries of the form “select uid,date,x,y,.. From table A where uid = uid1 AND date = d1”.


However, when we use Spark Cassandra connector we are seeing much slower performance, where it’s taking > 10 secs for fetching the data for above 2 queries.

Details on Spark cluster: executors=39, executor-cores=5.

For Q1, number of partitions to retrieve are 360 and each partition returns <18000 rows.

For Q2, number of partitions to retrieve are 20k and each partition returns < 200 rows.


We are seeing for Q2, it is only able to fire 195 queries in parallel to Cassandra and does this over 100 loops synchronously to retrieve 20k partitions causing slowness.


Questions:

  • How can we parallelize the fetches?
  • Does Spark cassandra connector use async API when fetching from Cassandra?
  • How should we configure the spark properties for above query patterns? Should we use Join based fetch or IN clause fetch or another strategy?  

Happy to provide more details for further investigation.

Thanks
Sneha

Arvydas Jonusonis

unread,
Nov 22, 2024, 8:32:20 PM11/22/24
to spark-conn...@lists.datastax.com
Hi Sneha,

Spark operations are generally bulk-oriented - when you say you "fired off 360 queries" - how are these being executed? Usually, you would describe your operations in terms of Spark operations and the SCC would generate the necessary C* queries. Which SCC API are you using? ..the RDD or Dataset API? 

The way to parallelize single-partition lookups using the RDD api would be to do a join between a generic RDD containing the id's (are these being loaded from a file?) and the CassandraRDD of the table you are interested in querying. Have a look here in the SCC docs for an example. Pay attention to what repartitionByCassandraReplica function does.

If you're wondering what kind of actual C* queries the SCC is issuing, I would have a look at the contents of the system.prepared_statements table on one of the cluster nodes  (search for the table in question).

Here's a simple example:


scala> val idsOfInterest = sc.parallelize(List("a", "b", "c")).map(TableId(_)).joinWithCassandraTable("test", "spark_join_example").collect.foreach(println)
(TableId(a),CassandraRow{id: a, col1: o, col2: x})
(TableId(b),CassandraRow{id: b, col1: p, col2: y})
(TableId(c),CassandraRow{id: c, col1: q, col2: z})


root@ip-10-166-64-247:~# cqlsh -e "select * from system.prepared_statements" | grep spark_join_example
 0xb0fb08352a0f8c57b7e5d067ce96d7c5 |            null |      SELECT "id", "col1", "col2" FROM "test"."spark_join_example" WHERE "id" = :id


To answer your second question - yes the SCC uses executeAsync mostly.

Best Regards,

Arvydas



To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Reply all
Reply to author
Forward
0 new messages