Possible to delete records using the connector?

2,518 views
Skip to first unread message

Matt Saunders

unread,
Nov 11, 2015, 6:40:23 PM11/11/15
to DataStax Spark Connector for Apache Cassandra
Of course deleting records from Cassandra is discouraged because it adds overhead, and you should try to minimize deletion whenever possible.

Now that that's out of the way, is there a way to delete records using the Spark Cassandra connector? Or do you need to use the Java driver and execute "DELETE" statements directly?

Russell Spitzer

unread,
Nov 16, 2015, 2:16:39 PM11/16/15
to DataStax Spark Connector for Apache Cassandra
To delete you just pass null for a column value, inserting a NULL is the same as deleting in C*

On Wed, Nov 11, 2015 at 3:40 PM Matt Saunders <mtts...@gmail.com> wrote:
Of course deleting records from Cassandra is discouraged because it adds overhead, and you should try to minimize deletion whenever possible.

Now that that's out of the way, is there a way to delete records using the Spark Cassandra connector? Or do you need to use the Java driver and execute "DELETE" statements directly?

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Xiaochuan Qin

unread,
Mar 18, 2016, 6:05:35 PM3/18/16
to DataStax Spark Connector for Apache Cassandra
Why we dont have a function supporting tombstone? My application demands a function like delete each items in a RDD from C*. If only delete the non-primary columns, I have to do lots of filters when load back. Can it be nicer to have a function which can fully delete the row?

Eric Meisel

unread,
Mar 18, 2016, 6:07:49 PM3/18/16
to DataStax Spark Connector for Apache Cassandra

Seconded. I usually have to create a loop within an rdd and make raw connector calls.


On Fri, Mar 18, 2016, 5:05 PM Xiaochuan Qin <xiaoc...@gmail.com> wrote:
Why we dont have a function supporting tombstone? My application demands a function like delete each items in a RDD from C*. If only delete the non-primary columns, I have to do lots of filters when load back. Can it be nicer to have a function which can fully delete the row?

Russell Spitzer

unread,
Mar 18, 2016, 6:11:29 PM3/18/16
to DataStax Spark Connector for Apache Cassandra
So the request is to feed an RDD of partition key values into DELETE operations?

On Fri, Mar 18, 2016 at 3:05 PM Xiaochuan Qin <xiaoc...@gmail.com> wrote:
Why we dont have a function supporting tombstone? My application demands a function like delete each items in a RDD from C*. If only delete the non-primary columns, I have to do lots of filters when load back. Can it be nicer to have a function which can fully delete the row?

Eric Meisel

unread,
Mar 18, 2016, 6:13:59 PM3/18/16
to DataStax Spark Connector for Apache Cassandra

Yes on my end.

Xiaochuan Qin

unread,
Mar 18, 2016, 6:14:00 PM3/18/16
to DataStax Spark Connector for Apache Cassandra
Yes it is.

Russell Spitzer

unread,
Mar 18, 2016, 6:18:54 PM3/18/16
to DataStax Spark Connector for Apache Cassandra
We can make a ticket around this but i'm imaging we do something like

RDD.deleteFromCassandra(keyspace, table, columns = None) 

Which would execute statements like

DELETE columns.mkstring(",") FROM keyspace.table WHERE pk = ? // Feed in values from RDD HERE

Where columns = None means wiping the full Partition


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Eric Meisel

unread,
Mar 18, 2016, 6:20:20 PM3/18/16
to DataStax Spark Connector for Apache Cassandra

That would be nice and flexible.

Russell Spitzer

unread,
Mar 18, 2016, 6:24:26 PM3/18/16
to spark-conn...@lists.datastax.com
https://datastax-oss.atlassian.net/browse/SPARKC-349 Please add any other comments or ideas you have for the interface
Message has been deleted

Xiaochuan Qin

unread,
Mar 21, 2016, 12:24:00 PM3/21/16
to DataStax Spark Connector for Apache Cassandra
I came up with a workaround by saving with a very short ttl, like

CassandraJavaUtil.javaFunctions(theRDD)
.writerBuilder(keyspace, table, writer_factory)
.withConstantTTL(1)
.saveToCassandra();

Matt Chang

unread,
Jul 6, 2016, 1:41:22 AM7/6/16
to DataStax Spark Connector for Apache Cassandra
Xiaochuan Qin於 2016年3月22日星期二 UTC+8上午12時24分00秒寫道:
Hi, anyone tried this?

I did it, and now i get lots of "Scanned over 100000 tombstones" exception from SliceQueryFilter.java.
A table of mine become un-query-able because spark-cassandra-connector says reading timeout (consistency LOCAL_ONE, 0 replica responded).

I think i shouldn't have done this, but...

Anyone can please help me, thank you.

p.s. I tried nodetool compact my table but just in vain.

Etienne

unread,
Jul 6, 2016, 4:43:26 AM7/6/16
to spark-conn...@lists.datastax.com
Have you tried to use node tool repair ?

Matt

unread,
Jul 6, 2016, 5:12:32 AM7/6/16
to spark-conn...@lists.datastax.com
I use OpsCenter to manage my cluster. So there is already a repair process running.(it will take 2 days though...)
If I try to repair manually by using nodetool repair <keyspace> <table>, it says nothing to repair.

And I'm also wondering that what will happen when a bunch data expire and get deleted( I get billion records per day with same TTL.), 
Will tombstone exception happen again? :p

Thank you.

Etienne

unread,
Jul 6, 2016, 5:28:36 AM7/6/16
to spark-conn...@lists.datastax.com
You can manage you tombe stone by ordering the rows by clustering keys.

Imagines your have (PK , CK = insert time stamp )

If you order your row to have the newest first you will never have to brows thought the tombstoned rows.

exemple with tll = 2
actual time is 5

| PK   |   timestamp |   tombstone (hidden ) |
| 1      |         5 | no |
| 1      |         4 | no |
| 1      |         3 | yes |
| 1      |         2 | yes |
| 1      |         1 | yes |


if you have to « SELECT * FROM this.table WHERE timestamp >=  4;"
You will not have to browse tombe stones, so you will have betters performances.

Matt

unread,
Jul 6, 2016, 5:44:34 AM7/6/16
to spark-conn...@lists.datastax.com
I see. It will help when data expire by TTL normally. Thank you.

And about my case ( random data deletion), I will wait and see if repair can solve the problem.

Thanks again.

Russell Spitzer

unread,
Jul 6, 2016, 11:24:18 AM7/6/16
to spark-conn...@lists.datastax.com
Your tombstones will not be removed until after GC grace occurs. Deletes create a tombstone for every piece of deleted data. This mechanism exists to prevent deleted data from being revived in network partitions or node failures. Only after GC_GRACE_SECONDS has expired will data be removed in the next Compaction. So if you want to remove all tombstones you need to lower gc_grace then trigger a compaction.

Russell Spitzer

unread,
Jul 6, 2016, 11:25:01 AM7/6/16
to spark-conn...@lists.datastax.com
You can also adjust the thresholds for queries failing or warning on overwhelming tombstones in your cassandra.yaml although having lots of tombstones is bad for almost all queries.

Matt

unread,
Jul 6, 2016, 10:18:02 PM7/6/16
to spark-conn...@lists.datastax.com
Thanks Russell.

It's exactly what I need.
After updating gc_grace_seconds to 0, I run compaction again.
Now I can query my table with no error again.

Thank you so much!

VD

unread,
Sep 13, 2016, 11:52:41 AM9/13/16
to DataStax Spark Connector for Apache Cassandra
Hi Eric,

I am trying the option of raw delete of each row by looping through an RDD using JAVA. code snippet is below:

transient static CassandraConnector connector = null;
transient static Session session;
transient static PreparedStatement pstmt;
transient static BoundStatement delBoundStmt;

private static void deleteData(JavaStreamingContext jssc,JavaDStream<DataEvent> dataEvents) {

connector = CassandraConnector.apply(jssc.sparkContext().getConf());


dataEvents.map(new Function<DataEvent,Void>() {
@Override
public Void call(DataEvent dataEvent) throws Exception {

session = connector.openSession();
pstmt = session.prepare("DELETE FROM temp.DataEvent where id = '?' and seq = ? and tid = ? and dtm = ?");
delBoundStmt = new BoundStatement(pstmt).bind(dataEvent.getId(),dataEvent.getSeq(),dataEvent.getTid(),dataEvent.getDtm());
session.execute(delBoundStmt);
session.close();

return null;
}
});

}

But records are not getting deleted ...

Can you please help with this.


Thanks
Vinayak

Arvydas Jonusonis

unread,
Sep 14, 2016, 4:24:38 AM9/14/16
to spark-conn...@lists.datastax.com
Not sure why the records are not getting deleted, but you should be caching/re-using your session and prepared statement objects.

Arvydas

> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Russell Spitzer

unread,
Sep 14, 2016, 3:02:50 PM9/14/16
to spark-conn...@lists.datastax.com
Arvydas is correct with the caching and reusing. But the reason your code does nothing is Spark is Lazy and `map` does not require any execution. This kind of code is really looking for a `forEach` or `forEachPartition`

> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

VD

unread,
Sep 15, 2016, 7:53:59 AM9/15/16
to DataStax Spark Connector for Apache Cassandra
Hi Russell,

Based on your suggestion I am using forEach now, below is the modified code snippet:

After executing the same I am getting a null pointer exception while binding the dynamic values to query i.e. @ delBoundStmt = new BoundStatement(pstmt).bind(DataEvent.getId(), DataEvent.getSeq(), DataEvent.getTid(), DataEvent.getDtm());

Not sure if I am binding it wrongly. Can you please suggest if I am missing something.


===============================================================
transient static CassandraConnector connector = null;
transient static Session session;
transient static PreparedStatement pstmt;
transient static BoundStatement delBoundStmt;

private static void deleteDataEvents(JavaStreamingContext jssc,JavaDStream<DataEvent> DataEvents) {

DataEvents.print();

connector = CassandraConnector.apply(jssc.sparkContext().getConf());
session = connector.openSession();
pstmt = session.prepare("DELETE FROM emc.DataEvent where id = ? and seq = ? and tid = ? and dtm = ?");

DataEvents.foreachRDD(new Function2<JavaRDD<DataEvent>, Time, Void>() {
@Override
public Void call(JavaRDD<DataEvent> DataEventJavaRDD, Time time) throws Exception {
DataEventJavaRDD.foreach(new VoidFunction<DataEvent>() {
@Override
public void call(DataEvent DataEvent) {
System.out.println("SID = " + DataEvent.getSid());
System.out.println("ESEQ = " + DataEvent.getEseq());
System.out.println("ETID = " + DataEvent.getEtid());
System.out.println("EVDTM = " + DataEvent.getEvdtm());
delBoundStmt = new BoundStatement(pstmt).bind(DataEvent.getId(), DataEvent.getSeq(), DataEvent.getTid(), DataEvent.getDtm());
session.execute(delBoundStmt);

}
});

return null;
}
});
session.close();
}

Russell Spitzer

unread,
Sep 15, 2016, 11:20:33 AM9/15/16
to DataStax Spark Connector for Apache Cassandra
Time for a quick lesson on serializability and Spark :) I should probably write this into a real blogpost at some time...

Anything inside a 
Map, Filter, foreach, .... Spark Transformation
Will be serialized and executed remotely. Anything outside these functions will be handled on the driver.


With this in mind let's look at your code,

The pstmt is prepared outside the serialized functions and it's marked as transient. This means while it is defined on the Driver it is set to null
when the code is serialized. This is why you have a NPE.

Now how can we fix this? We want 1 Prepared statement to be made per Executor JVM and we NEED it to be made on the remote executor. Note that you
will have the same issue for the `session` variable.

The Connector comes with a basic tool for doing this called CassandraConnector. 
https://github.com/datastax/spark-cassandra-connector/blob/b1.6/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/CassandraConnector.scala

Which is a serializable reference to a C* connection. This provides a method `withSessionDo` which will use a JVM local session pool and handle all this for you. Along with this it will also provide a `SessionProxy` which will intercept calls to `prepare` and cause them to use a Cache as well. You can access these from Java but scala is much easier.

Eric Meisel

unread,
Dec 8, 2016, 2:15:06 PM12/8/16
to DataStax Spark Connector for Apache Cassandra
Hey Russell, I saw that you guys are pretty close to releasing this. Do you have insight as to when it will be released and if it would be released in 1.6+ libraries?

Russell Spitzer

unread,
Dec 8, 2016, 2:53:08 PM12/8/16
to DataStax Spark Connector for Apache Cassandra
We don't usually backport features so currently there is no plan for that.

Eric Meisel

unread,
Dec 9, 2016, 12:07:17 PM12/9/16
to DataStax Spark Connector for Apache Cassandra
Had a feeling. I'm wondering if you can help me out with the implementation of an efficient deletion process. I've been at this for a while but all of my solutions pale in comparison to standard spark-cassandra-connector functions. As an example, I had refactored a piece of code today in which the performance went from 12 hours to about 3, simply by removing a foreachPartition call and replacing it with standard saveToCassandra calls (which are great!).

Since deleteFromCassandra wont be available in the 1.6.x version tree, I'm stuck with a loop (I think). Let's say I need to delete persons from a database. Here's what i've got so far... (getDeletionsStatements simply does a mapping of a person to some prepared CQL queries). What can I do to boost the performance here?:

def deleteRecords(deletionRdd: RDD[Person],
                    cassandraConnector: CassandraConnector,
                    deletionTable: DeletionTable,
                    table: String,
                    keyspace: String,
                    retries: Int,
                    timeout: Long,
                    timeUnit: TimeUnit): Unit =
    deletionRdd.foreachPartition { partition =>
      cassandraConnector.withSessionDo { session =>
        partition.foreach { person: Person =>
          getDeletionsStatements(
            person = person,
            session = session,
            table = table,
            keyspace = keyspace
          ).foreach { statement: BoundStatement =>
            executeAsyncWithRetries(
              session = session,
              statement = statement,
              retries = retries,
              timeout = timeout,
              timeUnit = timeUnit
            ) match {
              case Success(x) => x
              case Failure(e) => throw e
            }
          }
        }
      }
    }

  def executeAsyncWithRetries(session: Session, statement: BoundStatement, retries: Int = 5, timeout: Long = 30, timeUnit: TimeUnit = SECONDS): Try[ResultSet] =
    retry(retries) {
      session.executeAsync(statement).get(timeout, timeUnit)
    }


  @annotation.tailrec
  private def retry[K](n: Int)(fn: => K): Try[K] = {
    Try { fn } match {
      case x: Success[K] => x
      case x if n > 1 => retry(n - 1)(fn)
      case x => x
    }
  }

Eric Meisel

unread,
Dec 16, 2016, 9:40:02 AM12/16/16
to DataStax Spark Connector for Apache Cassandra
I just want to mention that I've added a Pull Request to backport this delete functionality as well as left-joins to 1.6. I'm hopeful that you will accept it! 

To solve my issue, I ended up making my own version of the connector for 1.6.2 that has these backported functions. 

- Eric

Russell Spitzer

unread,
Dec 16, 2016, 11:48:28 AM12/16/16
to DataStax Spark Connector for Apache Cassandra
I think the 1.6 left join is waiting on a rebase? But we voted that we would merge it :)

Eric Meisel

unread,
Dec 16, 2016, 11:55:04 AM12/16/16
to DataStax Spark Connector for Apache Cassandra
From what I saw in the PR, it was waiting on the developer's signature on the CLA. The last update to it was a little over a month ago. I decided to include it in this PR to help the effort along faster.
Reply all
Reply to author
Forward
0 new messages