Partition key predicate must include all partition key columns

303 views
Skip to first unread message

Mohammed Guller

unread,
Jun 6, 2016, 6:54:48 PM6/6/16
to spark-conn...@lists.datastax.com

We have run into a strange problem using SCC version 1.5 with Spark 1.5.1. Assume that there is a table with four columns in the partition key: pc1, pc2, tbl, and pc4. If a Spark SQL query has only pc1 and pc2 in the WHERE clause, the query works fine. However, if a query has pc1, pc2 and “tbl” in the WHERE clause, the query throws this exception:

 

java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: pc1, pc2, tbl, pc4

 

I looked up the SCC source code to see what is the problem. It looks like this exception is thrown in the containsPartitionKey method in the CassandraRDDPartitioner.scala file. As per the code (shown below), this exception should be thrown even if only pc1 and pc2 are provided in the where clause. However it does not. So if the WHERE clause contains any of the following combinations, it works fine:

1)      pc1 (for e.g. SELECT x from t where pc1=’somevalue’)

2)      pc1 and pc2 (for e.g. SELECT x from t where pc1=’somevalue’ and pc2=’somevalue2’)

3)      pc1, pc2 and pc4

 

However, if “tbl” column is present in the WHERE clause and one of the other partition key column is missing, the above mentioned exception is thrown. The other strange thing is that even if pc1 and pc2 are present in the WHERE clause, the exception indicates that they are missing.

 

Any idea what is going on?

 

Here is the Spark Casssandr Connector code that seems to be throwing the exception.

================

  private def containsPartitionKey(clause: CqlWhereClause) = {

    val pk = tableDef.partitionKey.map(_.columnName).toSet

    val wherePredicates: Seq[Predicate] = clause.predicates.flatMap(CqlWhereParser.parse)

 

    val whereColumns: Set[String] = wherePredicates.collect {

      case EqPredicate(c, _) if pk.contains(c) => c

      case InPredicate(c) if pk.contains(c) => c

      case InListPredicate(c, _) if pk.contains(c) => c

      case RangePredicate(c, _, _) if pk.contains(c) =>

        throw new UnsupportedOperationException(

          s"Range predicates on partition key columns (here: $c) are " +

            s"not supported in where. Use filter instead.")

    }.toSet

 

    if (whereColumns.nonEmpty && whereColumns.size < pk.size) {

      val missing = pk -- whereColumns

      throw new UnsupportedOperationException(

        s"Partition key predicate must include all partition key columns. Missing columns: ${missing.mkString(",")}"

      )

    }

 

    whereColumns.nonEmpty

 }

================

 

Thanks,

Mohammed

 

Russell Spitzer

unread,
Jun 6, 2016, 7:07:14 PM6/6/16
to spark-conn...@lists.datastax.com
Can you give the C* schema and queries that you run to replicate this?

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Mohammed Guller

unread,
Jun 6, 2016, 8:18:27 PM6/6/16
to spark-conn...@lists.datastax.com

Here is a simplified version of the C* table:

 

CREATE TABLE data_tbl (

    mfr    text,

    prod  text,

    sch     text,

    ec       text,

    sch     text,

    tbl       text,

    obs_dt timestamp,

    obs_ts timestamp,

    row       int,

    x            text,

    y            text,

    ... // other columns

    PRIMARY KEY ((mfr, prod, sch, ec, sysid, tbl, obs_dt), obs_ts, row)

) WITH CLUSTERING ORDER BY (obs_ts DESC, row ASC);

 

The following Spark SQL query throws exception:

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1'

 

The following queries work:

 

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and sysid='sys1' and obs_dt='2016-05-29 00:00:00'  

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and sysid='sys1'

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1'

 

Essentially, as soon as we add tbl='tbl1' in the WHERE clause, we get an exception. However, if all the partition key columns are provided, it works then even with tbl=’tbl1’. So this query also works

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1' and obs_dt='2016-05-29 00:00:00'

 

It seems like a totally different code path inside SCC is executed if tbl='tbl1' is included in the WHERE clause.

 

Mohammed

Russell Spitzer

unread,
Jun 6, 2016, 8:21:14 PM6/6/16
to spark-conn...@lists.datastax.com
Yeah I see, it shouldn't be attempting to push down at all unless all the PK elements are present. That's definitely a bug

Mohammed Guller

unread,
Jun 6, 2016, 8:37:42 PM6/6/16
to spark-conn...@lists.datastax.com

So why is this bug not manifesting when tbl=’tbl1’ is present in the WHERE clause? It is like the tbl column has a special meaning and its presence in the WHERE clause triggers a totally different code path.

 

Looking at the code flow, the getPartitions method in the CassandraTableScanRDD.scala file calls the partitions method, which calls the  containsPartitionKey method (both in the CassandraRDDPartitioner.scala file). I don’t see anything that would trigger a different code path for  tbl=’tbl1’

 

Another strange thing is that when the exception is thrown for this query:

select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1'

the missing columns list includes mfr, prod, sch, ec, tbl and sysid, even though they are present.

Russell Spitzer

unread,
Jun 6, 2016, 8:42:04 PM6/6/16
to spark-conn...@lists.datastax.com

Not sure but I think the problem is in the datasource pushdown logic. Basically you should be unable to trigger that exception when using spark sql, it should only effect rdd api users. I'll be at spark summit for the next two days but I'll ask my teammates to look into it

Jaroslaw Grabowski

unread,
Jun 7, 2016, 3:58:19 AM6/7/16
to spark-conn...@lists.datastax.com
Could please look into SPARKC-348 and see whether it helps in your case? 1.6.0 is coming but till it is released please use master branch for testing.

JAROSLAW GRABOWSKI
Software Engineer

Mohammed Guller

unread,
Jun 7, 2016, 9:50:27 PM6/7/16
to spark-conn...@lists.datastax.com

Thanks. It does look like related to SPARKC-348.

 

I haven’t tried the patch yet, but I see a secondary index created on the tbl column.

 

Mohammed

Mohammed Guller

unread,
Jun 7, 2016, 11:06:24 PM6/7/16
to spark-conn...@lists.datastax.com

Thanks, Russell.

 

I suspected it must be happening somewhere upstream in the code. I guess that the data source API is eventually calling the partitions method.

Russell Spitzer

unread,
Jun 8, 2016, 12:31:52 PM6/8/16
to spark-conn...@lists.datastax.com
Thank Jaroslaw for that fix :) 

Mohammed Guller

unread,
Jun 12, 2016, 11:10:39 AM6/12/16
to spark-conn...@lists.datastax.com

Thank you, Jaroslaw!

Reply all
Reply to author
Forward
0 new messages