We have run into a strange problem using SCC version 1.5 with Spark 1.5.1. Assume that there is a table with four columns in the partition key: pc1, pc2, tbl, and pc4. If a Spark SQL query has only pc1 and pc2 in the WHERE clause, the query works fine. However, if a query has pc1, pc2 and “tbl” in the WHERE clause, the query throws this exception:
java.lang.UnsupportedOperationException: Partition key predicate must include all partition key columns. Missing columns: pc1, pc2, tbl, pc4
I looked up the SCC source code to see what is the problem. It looks like this exception is thrown in the containsPartitionKey method in the CassandraRDDPartitioner.scala file. As per the code (shown below), this exception should be thrown even if only pc1 and pc2 are provided in the where clause. However it does not. So if the WHERE clause contains any of the following combinations, it works fine:
1) pc1 (for e.g. SELECT x from t where pc1=’somevalue’)
2) pc1 and pc2 (for e.g. SELECT x from t where pc1=’somevalue’ and pc2=’somevalue2’)
3) pc1, pc2 and pc4
However, if “tbl” column is present in the WHERE clause and one of the other partition key column is missing, the above mentioned exception is thrown. The other strange thing is that even if pc1 and pc2 are present in the WHERE clause, the exception indicates that they are missing.
Any idea what is going on?
Here is the Spark Casssandr Connector code that seems to be throwing the exception.
================
private def containsPartitionKey(clause: CqlWhereClause) = {
val pk = tableDef.partitionKey.map(_.columnName).toSet
val wherePredicates: Seq[Predicate] = clause.predicates.flatMap(CqlWhereParser.parse)
val whereColumns: Set[String] = wherePredicates.collect {
case EqPredicate(c, _) if pk.contains(c) => c
case InPredicate(c) if pk.contains(c) => c
case InListPredicate(c, _) if pk.contains(c) => c
case RangePredicate(c, _, _) if pk.contains(c) =>
throw new UnsupportedOperationException(
s"Range predicates on partition key columns (here: $c) are " +
s"not supported in where. Use filter instead.")
}.toSet
if (whereColumns.nonEmpty && whereColumns.size < pk.size) {
val missing = pk -- whereColumns
throw new UnsupportedOperationException(
s"Partition key predicate must include all partition key columns. Missing columns: ${missing.mkString(",")}"
)
}
whereColumns.nonEmpty
}
================
Thanks,
Mohammed
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Here is a simplified version of the C* table:
CREATE TABLE data_tbl (
mfr text,
prod text,
sch text,
ec text,
sch text,
tbl text,
obs_dt timestamp,
obs_ts timestamp,
row int,
x text,
y text,
... // other columns
PRIMARY KEY ((mfr, prod, sch, ec, sysid, tbl, obs_dt), obs_ts, row)
) WITH CLUSTERING ORDER BY (obs_ts DESC, row ASC);
The following Spark SQL query throws exception:
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1'
The following queries work:
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and sysid='sys1' and obs_dt='2016-05-29 00:00:00'
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and sysid='sys1'
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1'
Essentially, as soon as we add tbl='tbl1' in the WHERE clause, we get an exception. However, if all the partition key columns are provided, it works then even with tbl=’tbl1’. So this query also works
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1' and obs_dt='2016-05-29 00:00:00'
It seems like a totally different code path inside SCC is executed if tbl='tbl1' is included in the WHERE clause.
Mohammed
So why is this bug not manifesting when tbl=’tbl1’ is present in the WHERE clause? It is like the tbl column has a special meaning and its presence in the WHERE clause triggers a totally different code path.
Looking at the code flow, the getPartitions method in the CassandraTableScanRDD.scala file calls the partitions method, which calls the containsPartitionKey method (both in the CassandraRDDPartitioner.scala file). I don’t see anything that would trigger a different code path for tbl=’tbl1’
Another strange thing is that when the exception is thrown for this query:
select x,y from data_tbl where mfr='m1' and prod='p1' and sch='s1' and ec='ec1' and tbl='tbl1' and sysid='sys1'
the missing columns list includes mfr, prod, sch, ec, tbl and sysid, even though they are present.
Not sure but I think the problem is in the datasource pushdown logic. Basically you should be unable to trigger that exception when using spark sql, it should only effect rdd api users. I'll be at spark summit for the next two days but I'll ask my teammates to look into it
Thanks. It does look like related to SPARKC-348.
I haven’t tried the patch yet, but I see a secondary index created on the tbl column.
Mohammed
Thanks, Russell.
I suspected it must be happening somewhere upstream in the code. I guess that the data source API is eventually calling the partitions method.
Thank you, Jaroslaw!