Error performing joins with spark cassandra connector

217 views
Skip to first unread message

Marc Ramírez Invernon

unread,
Jan 28, 2021, 5:25:34 AM1/28/21
to DataStax Spark Connector for Apache Cassandra

Hello all community, I was trying to do the following

Using
```
Spark info: version 3.0.1-amzn-0
Cassandra connector: 3.0.0
```

Given dataframe A read from cassandra with schema:

Cassandra table schema

```
primary_id, string
second_id, string
record string (concatenated by ,)
```

and dataframe B read from csv with schema:

```
primary_id, string
second_id, string
```

Then:

Read data ---> perform split --> join --> aggregation

```
val csData = spark.read.table("mycatalog.platform.data")

val csvDf = spark.read.csv("...")


val flat_df = csData.withColumn("spplited",split(col("record"),","))
val query = flat_df
.select(
    col("spplited").getItem(2).as("cluster_id"),
    col("spplited").getItem(3).as("count")

val joinDf = query.join(csvDf, Seq("primary_id", "second_id"), "inner")

joinDf.groupBy(joinDf("cluster_id")).agg(count("count")).show
```

This prompts this error:

```
scala.MatchError: split(record#5, ,, -1)[2] AS cluster_id#23 (of class org.apache.spark.sql.catalyst.expressions.Alias)
  at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.$anonfun$aliasMap$1(CassandraDirectJoinStrategy.scala:315)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
```

If I try to move the join at first step for the explain plan this is what happen:

Read data ---> join --> perform split --> aggregation

```
val csData = spark.read.table("mycatalog.platform.data")
val csvDf = spark.read.csv("...")

val joinDf = csData.join(csvDf, Seq("primary_id", "second_id"), "inner")

val flat_df = joinDf.withColumn("spplited",split(col("record"),","))
val query = flat_df
.select(
    col("spplited").getItem(2).as("id"),
    col("spplited").getItem(3).as("count")

query.groupBy(query("cluster_id")).agg(count("count")).show
```

This prompts this error:

```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
.
.
.
Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: none#0
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:495)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:494)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 167 more
```

  Any idea? Do you know if this could be a bug?

Marc Ramírez Invernon

unread,
Jan 28, 2021, 5:32:20 AM1/28/21
to DataStax Spark Connector for Apache Cassandra, Marc Ramírez Invernon
there is a type error:
col("spplited").getItem(2).as("id")   col("spplited").getItem(2).as("cluster_id")
Reply all
Reply to author
Forward
0 new messages