Datastax Deletefromcassandra throwing error

158 views
Skip to first unread message

revathi p

unread,
Jun 1, 2020, 6:59:04 PM6/1/20
to DataStax Spark Connector for Apache Cassandra
Hi Everyone,

I am trying to use rdd.deleteFromCassandra and getting the below error , I have datastax:spark-cassandra-connector:2.4.2-s_2.12 library installed on my databricks cluster

command-3863946755109700:12: error: object deleteFromCassandra is not a member of package com.datastax.spark.connector.rdd
  rdd.deleteFromCassandra(
      ^
command-3863946755109700:14: error: not found: value writeConf
  writeConf = WriteConf(hupExDf.count > 0))


Below is the code I am trying to use ( basically trying to delete number of rows from Cassandra which are saved in a delta table)


import com.datastax.spark.connector._
import com.datastax.spark.connector.writer._

val hupExDf = sqlContext.sql(s"select * from $auditExDbName")
if(hupExDf.count > 0){
  val hupExSchema = Seq("msd_code","effective_date","sys_creation_date","sys_update_date","bpr_tier","company_name","expiration_date")
  hupExDf.toDF(hupExSchema: _*)
//    sc.parallelize(Seq(("animal", "trex"), ("animal", "mammoth")))
//   rdd.deleteFromCassandra(cassKeyspace, "hup_corp_msd_reference") 
 
  rdd.deleteFromCassandra(
  cassKeyspace,"hup_corp_msd_reference",
  writeConf = WriteConf(hupExDf.count > 0))
  display(hupExDf)
}

Russell Spitzer

unread,
Jun 2, 2020, 11:22:49 AM6/2/20
to DataStax Spark Connector for Apache Cassandra
I'm not sure what api you are using there but the "WriteConf" object definitely doesn't take a single boolean as an argument...

http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.4.2/spark-cassandra-connector/index.html#com.datastax.spark.connector.RDDFunctions@deleteFromCassandra(keyspaceName:String,tableName:String,deleteColumns:com.datastax.spark.connector.ColumnSelector,keyColumns:com.datastax.spark.connector.ColumnSelector,writeConf:com.datastax.spark.connector.writer.WriteConf)(implicitconnector:com.datastax.spark.connector.cql.CassandraConnector,implicitrwf:com.datastax.spark.connector.writer.RowWriterFactory[T]):Unit

http://datastax.github.io/spark-cassandra-connector/ApiDocs/2.4.2/spark-cassandra-connector/index.html#com.datastax.spark.connector.writer.WriteConf

So your main issue is that you aren't using the correct Apis for either of these functions and that needs to be fixed. 


You may also have a classpath issue but I can't really tell, make sure in Databricks you are including the "Maven LIbrary" and not a single jar when setting up your job.

Do actually do what you want
 rdd.deleteFromCassandra(cassKeyspace, "hup_corp_msd_reference")   <<--- This should be all you need assuming RDD has the writ mapping for primary key values

For other information and usages check our integration suites. 
https://github.com/datastax/spark-cassandra-connector/blob/7cc35770c77a7da59b5fae362231d6dc0fbcb962/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala#L1246-L1351


--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

revathi p

unread,
Jun 11, 2020, 2:38:51 PM6/11/20
to spark-conn...@lists.datastax.com
Hi Russell,

Thanks for your response, what i am trying to do is i have a couple of rows in the databricks delta table which have to be deleted from cassandra and i am using the below code, The issue i am facing is i am storing everything in a dataframe and unable to delete the values from the dataframe in the cassandra query. Below is my code where msd_code and effective_date are the primary keys:


import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
val hupCorpMsdDf = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "hup_corp_msd_reference", "keyspace" -> cassKeyspace))
  .load
val msd_code = sqlContext.sql(s"select msd_code , effective_date from dalauditqa01.hup_corp_msd_reference_casextra_audit_report")
val dfToDelete = hupCorpMsdDf.join(msd_code, hupCorpMsdDf("msd_code") === msd_code("msd_code") )
dfToDelete.show();
   val cdbConnector = CassandraConnector(sc)
   hupCorpMsdDf.foreachPartition(partition => {
     cdbConnector.withSessionDo(session =>
       //partition.foreach { msd =>
        //val deletequeryDF = s"DELETE FROM qa01_oneviewdashboard.hup_corp_msd_reference where msd_code =" + msd_code + ";";
       session.execute(dfToDelete)
     //})
     )
   })

Arvind Agarwal

unread,
Jun 11, 2020, 2:41:42 PM6/11/20
to spark-conn...@lists.datastax.com
how to write timestamp (yyyy-MM-dd hh:mm:ss) type of column and set collection in cassandra table to write using spark scala dataframe.


On Tue, Jun 2, 2020 at 11:22 AM Russell Spitzer <russell...@gmail.com> wrote:

Russell Spitzer

unread,
Jun 11, 2020, 3:20:36 PM6/11/20
to DataStax Spark Connector for Apache Cassandra
That would be a similar manual approach but I don't think you actually need to do the join since you want to remove the records, if they exist they will be deleted and if they aren't it won't do anything. You could still use the delete from cassandra APi if you wanted as well. 

I would warn you that your current implementation would be synchronous within each executor thread since you are using session.execute, rather than some async-future handling method.

Russell Spitzer

unread,
Jun 11, 2020, 3:21:25 PM6/11/20
to DataStax Spark Connector for Apache Cassandra
A SparkSql Array<Timestamp> would be compatible. So you should take your strings and convert them so that your dataframe row has a column of Array<Timestamp> instead of array string. That should do it.

Arvind Agarwal

unread,
Jun 11, 2020, 3:31:23 PM6/11/20
to spark-conn...@lists.datastax.com
Timestamp is separate column i am saving with schema TimestampType in spark Structtype it is giving me error.

CREATE TABLE practice.names (
    name text PRIMARY KEY,
    insert_date timestamp,
set_col Set<Text>
 )

I want to save list of string values in set collection in cassandra. I am trying to write Array of String but it is giving me error could you please 
suggest me.

Russell Spitzer

unread,
Jun 11, 2020, 4:51:16 PM6/11/20
to DataStax Spark Connector for Apache Cassandra
Can you give an example of what you are running and the error?
Reply all
Reply to author
Forward
0 new messages