Spark cassandra connector dosent push filters from join condition

17 views
Skip to first unread message

Amol Khanolkar

unread,
Oct 22, 2020, 8:42:57 AM10/22/20
to DataStax Spark Connector for Apache Cassandra
Hi

I have 2 dataframes representing cassandra tables
Dataframe A
root
 |-- d_date: string (nullable = false)
 |-- m_d_id: string (nullable = false)
 |-- m_b_key: integer (nullable = false)
 |-- d_next_date: timestamp (nullable = false)

Dataframe B
root
 |-- d_id: string (nullable = false)
 |-- b_key: integer (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- version: string (nullable = true)

select * from A a join B b where a.m_d_id=b.d_id and m_b_key=b.b_key
Explain plan as expected
Project[ cols ....]
   Cassandra Direct Join [m_d_id=d_id, m_b_key=b_key]... Pushed {}
    ..
   ..

If I want to filter with constant values 
select * from A a join B b where a.m_d_id=b.d_id and m_b_key=b.b_key  and b.timestamp>to_date('2020-10-22') and b.timestamp< o_date('2020-10-23') 
Explain plan as expected
Project[ cols ....]
   Cassandra Direct Join [m_d_id=d_id, m_b_key=b_key] ... Pushed {("timestamp" > ?:2020-10-22 00:00:00.0), ("timestamp" < ?:2020-10-23 00:00:00.0)} 
    ..
   ..

Now if I want to make same filter conditions dynamic for each partition with direct join

select * from A a join B b where a.m_d_id=b.d_id and m_b_key=b.b_key  and b.timestamp> a.d_date and b.timestamp< a.d_next_date
Here is where it goes wrong 
Project[ cols ....]
   Cassandra Direct Join [m_d_id=d_id, m_b_key=b_key]... Pushed {}
    ..
   ..
Query returns empty results and also i dont see in explain plan filters being pushed down

What would be write way to filter data across paritions by different filter conditions ? I dont want to read whole partition and then filter in memory.


Regards
Amol

Russell Spitzer

unread,
Oct 22, 2020, 3:40:05 PM10/22/20
to DataStax Spark Connector for Apache Cassandra
I don't think I implemented pushing down dynamic pushdowns, as is the code is only allowed to do pushdowns that are presented by Catalyst here

Which will not find predicates that have a dynamic value (I believe) While we could handle pushing down dynamic filters like this I think it would be a bit of work.

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Amol Khanolkar

unread,
Oct 22, 2020, 10:25:30 PM10/22/20
to spark-conn...@lists.datastax.com
Is there any other alternative to achieve the same for now ?

Seems like a good feature to have .

Regards
Amol

Russell Spitzer

unread,
Oct 22, 2020, 10:47:57 PM10/22/20
to DataStax Spark Connector for Apache Cassandra
You could write your own manual join code an async executor. This is a lot simpler for a specific query than for the general case. This is basically what folks were doing before we added in the join with Cassandra api. Doing it within catalyst is significantly more difficult.

Amol Khanolkar

unread,
Oct 22, 2020, 10:53:53 PM10/22/20
to spark-conn...@lists.datastax.com
Sure thanks.. 

Can you point me async executor example you are referring too

Amol Khanolkar

unread,
Oct 26, 2020, 10:17:18 AM10/26/20
to spark-conn...@lists.datastax.com
Hey Sorry to ask about this again
async executor -> code you are referring can I run it in spark ? I am aware of normal java driver code where we use CqlSession

I see in spark cassandra doc we have Cassandra connector but there is no method to convert resultset into Dataframe 

From what I understand manual join I will need to iterate over dataframe and get individual results set and append them to Data Frame to get my final dataframe
However spark wont let me run sc.sql("....") inside map or foreach 

Regards
Amol

On Fri, 23 Oct 2020 at 08:17, Russell Spitzer <russell...@gmail.com> wrote:

Russell Spitzer

unread,
Oct 26, 2020, 10:30:18 AM10/26/20
to DataStax Spark Connector for Apache Cassandra
Yes you have to do it manually, you cannot have Spark SQL do it for your.

See this blog post on the basics of doing concurrent requests in a RDD action.

Basically you would do 

sc.cassandraTable(Table1).mapPartitions(ConcurrentRequestsFromTable2)

Where ConcurrentRequestsFrom table two, wraps a function which fires off requests based on the content of the rows returned from the first RDD.

Amol Khanolkar

unread,
Oct 26, 2020, 3:16:18 PM10/26/20
to spark-conn...@lists.datastax.com
Thanks this was useful

One last question I have trouble resolving getUninterruptibly.  Assuming that its deprecated in newer versions of driver I changed 
initIterator.map(futureBatch => futureBatch.head.toCompletableFuture.get()) ++
However spark job would still fail with 
java.io.NotSerializableException: com.datastax.oss.driver.internal.core.cql.DefaultAsyncResultSet
Serialization stack:
- object not serializable (class: com.datastax.oss.driver.internal.core.cql.DefaultAsyncResultSet, value: com.datastax.oss.driver.internal.core.cql.DefaultAsyncResultSet@161b4be)
- element of array (index: 0)
- array (class [Lcom.datastax.oss.driver.api.core.cql.AsyncResultSet;, size 10)

Is this some issue with the latest version of connector ? I am using 3.0 

Russell Spitzer

unread,
Oct 26, 2020, 3:20:53 PM10/26/20
to DataStax Spark Connector for Apache Cassandra
No, your usage is not correct. The asyncResult set cannot be serialized since it's just a pointer to an open request for results from the database and not the actual records.

In java you can only move things between JVMs if they are serializable (or you do something else to serialize them), when working with spark that means that any objects that go to the driver or from executor to executor must be serializable. So if you have iterables like asyncResult that are not serializable you must collect them into serializable objects before passing over a serialization boundary.

Amol Khanolkar

unread,
Oct 26, 2020, 3:49:39 PM10/26/20
to spark-conn...@lists.datastax.com
Hmm 

Only change w.r.t you example code is 

initIterator.map(futureBatch => futureBatch.head.getUninterruptibly) ++
      tailIterator.flatMap(lastBatch => lastBatch.map(_.getUninterruptibly))
changed to as getUninterruptibly not getting resolved. Should I be doing something different?
initIterator.map(futureBatch => futureBatch.head.toCompletableFuture.get()) ++
      tailIterator.flatMap(lastBatch => lastBatch.map(_.toCompletableFuture.get()))
Here is complete code pretty much same like one you explained
val b = sparkCtx.makeRDD(Seq(("partition1",202043),("partition2",202043)))
val c =  b.mapPartitions { it =>
  connector.withSessionDo {
session =>
 val ps = session.prepare("select * from table where part1 = ? and part2 = ?")
 val boundStatementIterator = it.map(x => ps.bind(x._1: java.lang.String, x._2: java.lang.Integer))
 val resultSetFutureIterator = boundStatementIterator.map(session.executeAsync)
 val slidingIterator = resultSetFutureIterator.sliding(batchSize - 1)
 val (initIterator, tailIterator) = slidingIterator.span(_ => slidingIterator.hasNext)
 initIterator.map(futureBatch => futureBatch.head.toCompletableFuture.get()) ++
tailIterator.flatMap(lastBatch => lastBatch.map(_.toCompletableFuture.get()))
  }
}

Russell Spitzer

unread,
Oct 26, 2020, 4:10:54 PM10/26/20
to DataStax Spark Connector for Apache Cassandra
The change in the driver means things are a bit more complicated than just changing a method. The Driver no longer returns an iterable AsyncResultSet, the new return type is different so the code will not behave the same way.

You can see how we do it internally here

But mostly I was trying to just show you an example of writing async code, you will need to write a version for your specific usecase.
Reply all
Reply to author
Forward
0 new messages