Filters over primary key columns are not pushed down

179 views
Skip to first unread message

Andres de la Peña

unread,
Sep 15, 2016, 6:56:43 AM9/15/16
to DataStax Spark Connector for Apache Cassandra
Hi all,

I have an indexed table with the following schema:

CREATE TABLE test(
pk1 int,
pk2 int,
ck1 int,
ck2 int,
lucene text,
rc1 bigint,
rc2 bigint,
PRIMARY KEY ((pk1, pk2), ck1, ck2)
);
CREATE INDEX ON test(rc1);

When I run the following queries in CQL:

SELECT * FROM test WHERE rc1=1 AND rc2=1 ALLOW FILTERING;
SELECT * FROM test WHERE rc1=1 AND pk2=1 ALLOW FILTERING;

Both queries use the index for indexed column rc1 and the other not indexed column is filtered during the scan. However, if I do the same with Spark 1.6.2 using spark-cassandra-connector_2.10:1.6.2:

val rdd = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "test", "table" -> "test")).load()
rdd.filter("rc1=1 AND rc2=1").count
rdd.filter("rc1=1 AND pk2=1").count

Only the first query pushes down the filter over the not indexed column, even though the explain method says that all the filters are going to be pushed down:

rdd.filter("rc1=1 AND pk2=1").explain
== Physical Plan ==
Filter (pk2#1 = 1)
+- Scan ... PushedFilters: [EqualTo(rc1,1), EqualTo(pk2,1)]

In general, it seems that filters over primary key columns are never pushed down when using the Spark connector, they never arrive to Cassandra.

Is this the expected behaviour? Am I doing something wrong? Doing the in-memory filters in Spark is less efficient than doing them in Cassandra...

Thanks,

Russell Spitzer

unread,
Sep 15, 2016, 12:02:18 PM9/15/16
to DataStax Spark Connector for Apache Cassandra
The first thing to know is that Spark is a liar. https://issues.apache.org/jira/browse/SPARK-12639

 PushedFilters actually just means these filters were "Shown" to the underlying source and doesn't mean the underlying source did anything with them. So just because you see "pushed filters" doesn't mean anything happened.

So based on your explain I can tell the following thing is happening

(pk2=1) is filtered in Spark
(rc1=1) is getting pushed all the way down to Cassandra. 

I can tell this because the C* connector marks all of it's filters as "handled" which means any time you see Spark executing a Filter it means C* did not handle it (1.6+)


So why isn't pk2 filtered in C*? Since this predicate doesn't match any of our current rules it has to be skipped. I have checked in a while but if this is now pushable (partial partition key specified with restriction on secondary Index) then please file a jira and we can update it. 


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Andres de la Peña

unread,
Sep 16, 2016, 7:03:06 AM9/16/16
to spark-conn...@lists.datastax.com
Hi Russell,

I have just created SPARKC-425 about this. 

Thanks for your quick response. 

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.



--
Andrés de la Peña

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Reply all
Reply to author
Forward
0 new messages