Marc Ramírez Invernon
unread,Jan 28, 2021, 5:25:34 AM1/28/21Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to DataStax Spark Connector for Apache Cassandra
Hello all community, I was trying to do the following
Using
```
Spark info: version 3.0.1-amzn-0
Cassandra connector: 3.0.0
```
Given dataframe A read from cassandra with schema:
Cassandra table schema
```
primary_id, string
second_id, string
record string (concatenated by ,)
```
and dataframe B read from csv with schema:
```
primary_id, string
second_id, string
```
Then:
Read data ---> perform split --> join --> aggregation
```
val csData = spark.read.table("mycatalog.platform.data")
val csvDf = spark.read.csv("...")
val flat_df = csData.withColumn("spplited",split(col("record"),","))
val query = flat_df
.select(
col("spplited").getItem(2).as("cluster_id"),
col("spplited").getItem(3).as("count")
val joinDf = query.join(csvDf, Seq("primary_id", "second_id"), "inner")
joinDf.groupBy(joinDf("cluster_id")).agg(count("count")).show
```
This prompts this error:
```
scala.MatchError: split(record#5, ,, -1)[2] AS cluster_id#23 (of class org.apache.spark.sql.catalyst.expressions.Alias)
at org.apache.spark.sql.cassandra.execution.CassandraDirectJoinStrategy$.$anonfun$aliasMap$1(CassandraDirectJoinStrategy.scala:315)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
```
If I try to move the join at first step for the explain plan this is what happen:
Read data ---> join --> perform split --> aggregation
```
val csData = spark.read.table("mycatalog.platform.data")
val csvDf = spark.read.csv("...")
val joinDf = csData.join(csvDf, Seq("primary_id", "second_id"), "inner")
val flat_df = joinDf.withColumn("spplited",split(col("record"),","))
val query = flat_df
.select(
col("spplited").getItem(2).as("id"),
col("spplited").getItem(3).as("count")
query.groupBy(query("cluster_id")).agg(count("count")).show
```
This prompts this error:
```
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
.
.
.
Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: none#0
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:495)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:494)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 167 more
```
Any idea? Do you know if this could be a bug?