Dataframe aggreagation

97 views
Skip to first unread message

Anand Nalya

unread,
Jun 25, 2015, 6:29:11 AM6/25/15
to spark-conn...@lists.datastax.com
Hi,

I'm using Spark 1.4.0 with Cassandra connector 1.4.0-M1.

I'm able to read data using data frames, but when I try to do aggregation I'm getting exception. Following is the relevant code:

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "localhost")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "abc", "keyspace" -> "dev_analytics", "spark.cassandra.input.split.size" -> "500")).load()

df.show() gives following output:

+-+-+-+
|a|b|c|
+-+-+-+
|a|a|1|
|c|a|1|
|b|a|2|
+-+-+-+

df.groupBy("a", "b").agg(sum("c")).show() throws the following exception:

=== Result of Batch Resolution ===
!'Aggregate [a#0,b#1], [a#0,b#1,SUM('c) AS SUM(c)#3] Aggregate [a#0,b#1], [a#0,b#1,SUM(CAST(c#2, LongType)) AS SUM(c)#3L]
Relation[a#0,b#1,c#2] org.apache.spark.sql.cassandra.CassandraSourceRelation@166c2c17 Relation[a#0,b#1,c#2] org.apache.spark.sql.cassandra.CassandraSourceRelation@166c2c17

15:54:32.789 [main] INFO o.a.s.s.c.CassandraSourceRelation - filters:
15:54:32.790 [main] INFO o.a.s.s.c.CassandraSourceRelation - pushdown filters: ArrayBuffer()
15:54:32.793 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function1> (org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1}) +++
15:54:32.797 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared fields: 2
15:54:32.797 [main] DEBUG org.apache.spark.util.ClosureCleaner - public static final long org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.serialVersionUID
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final scala.collection.Seq org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.outputTypes$2
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared methods: 2
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final java.lang.Object org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.apply(java.lang.Object)
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final scala.collection.Iterator org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.apply(scala.collection.Iterator)
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - + inner classes: 2
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$2
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer classes: 0
15:54:32.798 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer objects: 0
15:54:32.799 [main] DEBUG org.apache.spark.util.ClosureCleaner - + populating accessed fields because this is the starting closure
15:54:32.803 [main] DEBUG org.apache.spark.util.ClosureCleaner - + fields accessed by starting closure: 0
15:54:32.804 [main] DEBUG org.apache.spark.util.ClosureCleaner - + there are no enclosing objects!
15:54:32.804 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ closure <function1> (org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1) is now cleaned +++
15:54:32.825 [main] DEBUG o.a.s.s.execution.EnsureRequirements - Valid distribution,required: UnspecifiedDistribution current: UnknownPartitioning(0)
15:54:32.827 [main] DEBUG o.a.s.s.execution.EnsureRequirements - Invalid distribution,required: ClusteredDistribution(List(a#0, b#1)) current: UnknownPartitioning(0)
Exception in thread "main" java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.render(Lorg/json4s/JsonAST$JValue;)Lorg/json4s/JsonAST$JValue;
at org.apache.spark.sql.types.Metadata.json(Metadata.scala:81)
at org.apache.spark.sql.types.Metadata.toString(Metadata.scala:83)
at java.lang.String.valueOf(String.java:2982)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:320)
at scala.collection.AbstractIterator.addString(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:286)
at scala.collection.AbstractIterator.mkString(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:288)
at scala.collection.AbstractIterator.mkString(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.argString(TreeNode.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.simpleString(TreeNode.scala:395)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:429)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:431)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:431)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:431)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:400)
at org.apache.spark.sql.catalyst.trees.TreeNode.toString(TreeNode.scala:397)
at java.lang.String.valueOf(String.java:2982)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at scala.StringContext.standardInterpolator(StringContext.scala:122)
at scala.StringContext.s(StringContext.scala:90)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1$$anonfun$meetsRequirements$1$1$$anonfun$apply$12.apply(Exchange.scala:275)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1$$anonfun$meetsRequirements$1$1$$anonfun$apply$12.apply(Exchange.scala:274)
at org.apache.spark.Logging$class.logDebug(Logging.scala:63)
at org.apache.spark.sql.catalyst.rules.Rule.logDebug(Rule.scala:23)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1$$anonfun$meetsRequirements$1$1.apply(Exchange.scala:273)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1$$anonfun$meetsRequirements$1$1.apply(Exchange.scala:270)
at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:70)
at scala.collection.immutable.List.forall(List.scala:84)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1.meetsRequirements$1(Exchange.scala:270)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1.applyOrElse(Exchange.scala:334)
at org.apache.spark.sql.execution.EnsureRequirements$$anonfun$apply$1.applyOrElse(Exchange.scala:265)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:290)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:289)
at org.apache.spark.sql.execution.EnsureRequirements.apply(Exchange.scala:265)
at org.apache.spark.sql.execution.EnsureRequirements.apply(Exchange.scala:261)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:936)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:936)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1255)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1189)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1248)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:331)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:338)
at Test$delayedInit$body.apply(Test.scala:21)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at Test$.main(Test.scala:8)
at Test.main(Test.scala)

Am I missing something?

Regards,
Anand

yana

unread,
Jun 25, 2015, 8:51:49 AM6/25/15
to spark-conn...@lists.datastax.com
What is your classpath: this looks like the wrong version of jackson is picked up
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Anand Nalya

unread,
Jun 25, 2015, 9:29:05 AM6/25/15
to spark-conn...@lists.datastax.com

Yana Kadiyska

unread,
Jun 25, 2015, 10:59:12 AM6/25/15
to spark-conn...@lists.datastax.com
Spark has a dependency on a different version of json4s:
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.10</version>
</dependency>

You might want to try the advice given here: it's for spark1.3 but 1.4 should not be different...

On Thu, Jun 25, 2015 at 9:29 AM, Anand Nalya <anand...@gmail.com> wrote:
Reply all
Reply to author
Forward
0 new messages