Cassandra joinWithCassandraTable is not working in cassandra cluster but working in Standalone

183 views
Skip to first unread message

Amol Khanolkar

unread,
Aug 2, 2020, 2:21:13 AM8/2/20
to DataStax Spark Connector for Apache Cassandra

I have 2 cassandra instances ( local standalone single instance), and  3 node cluster.

 I have created table as below

CREATE TABLE dev.test ( a text, b int, c text, PRIMARY KEY ((a, b), c)) WITH CLUSTERING ORDER BY (c ASC)

I have entered data

a | b | c 
 A | 1 | C 
B | 2 | D

I am connecting to spark shell using below command. For cluster its i/p of one of machine in cluster. then below are individual commands

./spark-shell --conf spark.cassandra.connection.host=127.0.0.1 --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta 

scala> import com.datastax.spark.connector._ 
import com.datastax.spark.connector._ 

 scala> case class PrimaryKey(a: String,b: Int)
 defined class PrimaryKey 

 scala> val idsOfInterest = sc.sparkContext.parallelize(Seq(PrimaryKey("B",2))) idsOfInterest: org.apache.spark.rdd.RDD[PrimaryKey] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> val repartitioned = idsOfInterest.repartitionByCassandraReplica("dev", "test" ) repartitioned: com.datastax.spark.connector.rdd.partitioner.CassandraPartitionedRDD[PrimaryKey] = CassandraPartitionedRDD[6] at RDD at CassandraPartitionedRDD.scala:18

Now I see difference at this step **On My local **

scala> repartitioned.partitions
 res1: Array[org.apache.spark.Partition] = Array(ReplicaPartition(0,[Ljava.lang.String;@6d4693ee), ReplicaPartition(1,[Ljava.lang.String;@75038f62), ReplicaPartition(2,[Ljava.lang.String;@31bb5652), ReplicaPartition(3,[Ljava.lang.String;@77b46743), ReplicaPartition(4,[Ljava.lang.String;@2eb453b8), ReplicaPartition(5,[Ljava.lang.String;@4725a193), ReplicaPartition(6,[Ljava.lang.String;@2c4766e3), ReplicaPartition(7,[Ljava.lang.String;@781d9257), ReplicaPartition(8,[Ljava.lang.String;@7438377f), ReplicaPartition(9,[Ljava.lang.String;@1e7b3952))

Same steps when performed connecting to cluster (one of contact point)

scala> repartitioned.partitions
 res0: Array[org.apache.spark.Partition] = Array()

Because of this when I do joinwithCassandraTable on cluster it dosent return any records

Do I need to do any other configuration on my cluster for this to work ?

I am having lot of partition keys which i want to fetch and do in memory computations using below architecture. If I understnd correctly ReplicaPartition should partition keys as per their location in cluster

spark-deployment-1.png

Regards

Amol

Russell Spitzer

unread,
Aug 2, 2020, 9:44:34 AM8/2/20
to DataStax Spark Connector for Apache Cassandra
Repartition by replica is probably not going to help you if your spark nodes are not colocated with your Cassandra nodes. As to why your code isn't working, I could imagine there may be a bug in how repartition handles your hostnames? I would just not use that function. Feel free to file a jira though with more info.

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Amol Khanolkar

unread,
Aug 2, 2020, 9:51:47 AM8/2/20
to spark-conn...@lists.datastax.com
Thanks 

As in the above diagram I have mentioned
- I am  having a worker node share machine with cassandra node
- All workers are connecting to spark master which is not running on cassandra ( I am not using any cluster manager like yarn,mesos ) do I need to use those ?
- I am submitting my application with code as one I am using in spark shell script 
   ./spark-submit --master spark://master:7077    file:///home/ubuntu/hello-world_2.12-1.0.jar  ( Do I need to submit it in cluster mode ? )

Regards

Russell Spitzer

unread,
Aug 2, 2020, 11:19:41 AM8/2/20
to DataStax Spark Connector for Apache Cassandra
That's all correct, you don't have to use cluster mode. My guess would still be it's a bug and you should just not use repartition. To really figure it out you'll need to check that it works without the repartition call. If it does post the sample data and layout with IPs in a jira. 

Amol Khanolkar

unread,
Aug 2, 2020, 12:46:44 PM8/2/20
to spark-conn...@lists.datastax.com
Sure I will do that.. What would be the correct way to figure that aspect ?
I am using 3.0.0-beta with spark 3.0 version




Russell Spitzer

unread,
Aug 2, 2020, 1:14:06 PM8/2/20
to DataStax Spark Connector for Apache Cassandra
Collect the result of the commands with and without repartition

Amol Khanolkar

unread,
Aug 2, 2020, 1:16:07 PM8/2/20
to spark-conn...@lists.datastax.com
Oh 

When I call repartition is doesn't work and gives empty result already verified 

Russell Spitzer

unread,
Aug 2, 2020, 1:40:32 PM8/2/20
to DataStax Spark Connector for Apache Cassandra
Does that give zero records? I just saw the zero partitions

Amol Khanolkar

unread,
Aug 2, 2020, 1:49:54 PM8/2/20
to spark-conn...@lists.datastax.com
Yes, the next step gives 0 records too. That why I started debugging backwards and realised this ( I am new to cassandra)
Actually I have list of partition keys (composide -> (String,Int)) I want to fetch data only for those partition as dataframe or some spark structure

But even Spark's dataframe way also seems to be scanning over key ranges instead of directly loading for RDD/dataframe of partition keys.. If there is something obvious I have missed please let me know. 


Russell Spitzer

unread,
Aug 2, 2020, 2:09:33 PM8/2/20
to DataStax Spark Connector for Apache Cassandra
Not the next step. That step specifically.

Repartition.collect

Vs just collect

Dataframes supports a direct join (join with Cassandra table) in 3.0 and 2.5 as long as the key column types match and fit the whole partition key. They also require the key size to be much smaller than the Cassandra table. This is all documented in the docs and in some blog posts.

 Again I would just remove repartition from your codebase.

Amol Khanolkar

unread,
Aug 2, 2020, 2:33:34 PM8/2/20
to spark-conn...@lists.datastax.com
Yes 

Repartition.collect is empty ( when i am running code from driver machine (not on cassandra machine)) using spark-submit or spark-shell

Regarding dataframe .. I am reading csv as a dataframe (which has 2 columns which together form primary keys of the cassandra table) and then I want to join the CSV dataframe to cassandra dataframe . Is there a way I can achieve it ?
My hope was since i know primary key, So we could just fire queries with filters,projection (where pk1=? and pk2=?) and load data faster.

Regards
Amol



Russell Spitzer

unread,
Aug 2, 2020, 2:59:46 PM8/2/20
to DataStax Spark Connector for Apache Cassandra

Amol Khanolkar

unread,
Aug 2, 2020, 3:56:44 PM8/2/20
to spark-conn...@lists.datastax.com
Thanks for links

Can we do direct join ie. by mentioning partition keys in Spark OSS also or it only works on DSE

Russell Spitzer

unread,
Aug 2, 2020, 4:04:15 PM8/2/20
to DataStax Spark Connector for Apache Cassandra
The first link talks about how it was added to oss spark Cassandra connector.

Amol Khanolkar

unread,
Aug 2, 2020, 4:48:56 PM8/2/20
to spark-conn...@lists.datastax.com
Yes I read that I was referring more to CSV examples http://www.russellspitzer.com/2018/05/23/DSEDirectJoin/

When one of them is a dataframe (from csv) and other is a cassandra table will it be a complete scan or Cassandra Direct Join (in Spark OSS) ? 
Currently if one of them is CSV for me its not doing Cassandra Direct Join


Russell Spitzer

unread,
Aug 2, 2020, 4:57:39 PM8/2/20
to DataStax Spark Connector for Apache Cassandra
It depends on the settings as described in the various articles. The join works on all Datasources regardless of origin.

Alex Ott

unread,
Aug 3, 2020, 1:48:49 AM8/3/20
to spark-conn...@lists.datastax.com
I can add one more link to that list: 
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

Amol Khanolkar

unread,
Aug 3, 2020, 3:32:33 AM8/3/20
to spark-conn...@lists.datastax.com
Thanks a lot both of you .. This has really helped me alot .. Last question
- In scala same code , same tables when I joined They are doing cassandraDirectJoin

However python code with pyspark3 ( spark connector 3.0.0 beta ) does not use cassandraDirectJoin. Is there any additional config needed to enable this ?

Regards
Amol

Alex Ott

unread,
Aug 3, 2020, 4:12:47 AM8/3/20
to DataStax Spark Connector for Apache Cassandra
Look to the archives of this list - there was problem reported with PySpark, but I thought that it was only for 2.4 - you can try workaround described in emails

Amol Khanolkar

unread,
Aug 3, 2020, 12:19:51 PM8/3/20
to spark-conn...@lists.datastax.com
Sure I found this link 

Here is what I have done 
- Added lines 
    sc.sparkContext._jvm.com.datastax.spark.connector.CassandraSparkExtensions().apply(sc._jsparkSession.extensions())
sc = SparkSession(sc.sparkContext, sc._jsparkSession.cloneSession())

However I am using spark 3.0 with spark connector 3.0.0-beta with spark OSS
Link refers to BYOS JAR, Which would be correct version of the library for spark 3 with spark connector 3.0.0-beta ?

Regards
Amol


Alex Ott

unread,
Aug 3, 2020, 12:45:34 PM8/3/20
to DataStax Spark Connector for Apache Cassandra
The underlying code should be the same, only extensions name is different... Author of the email thread reported that he made it working with SCC 2.5.0 which has the same code as SCC 3.0.0-beta

Amol Khanolkar

unread,
Aug 4, 2020, 12:05:05 AM8/4/20
to spark-conn...@lists.datastax.com
Sure

However the old chain refers to using BYOS DSE 6.7.7 . It would be great if you can point me to appropriate jar if its available

Regards
Amol
Reply all
Reply to author
Forward
0 new messages