Hugo Ferreira
unread,Jan 26, 2015, 9:05:09 AM1/26/15Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to spark-conn...@lists.datastax.com
Hello,
I am trying to select rows of data wherein the column value may or may knot exist. I figure I could use a NaN or Infinity and filter on those using
Spark SQL seeing as 1.2.0 seems to support this. So I do:
val cc = new CassandraSQLContext(TestSparkContext)
cc.setKeyspace("test")
// Does not work, no nulls exist
val rdd: SchemaRDD = cc.cassandraSql("SELECT * from key_value WHERE f2 < Infinity")
and get a class not found error (org.apache.spark.sql.catalyst.analysis.Star, see stack trace below). This doesn't seem to come from the Cassandra connector. However I would would just like to check is this is possible. I am using a version of the git master of the Cassandra connector + spark 1.2.0.
I set up Cassandra as follows:
CassandraConnector(TestSparkContext.config).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE IF NOT EXISTS test.key_value (key INT, value VARCHAR, f1 DOUBLE, f2 DOUBLE, f3 DOUBLE, f4 VARCHAR, PRIMARY KEY(key, f1, f2, f3, f4))")
session.execute("TRUNCATE test.key_value")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (1, 'first row' , 11.0, 12.0, 13.0, 'f4_1' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (2, 'second row' , 21.0, 22.0, 23.0, 'f4_2' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (3, 'third row' , 31.0, NaN, 33.0, 'f4_3' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (5, 'fith row' , 41.0, Infinity, 43.0, 'f4_4' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (7, 'seventh row', 51.0, 52.0, 53.0, '' )")
}
Can anyone comment on this?
TIA,
HF
[info] - should be able to select a subset of applicable features *** FAILED ***
[info] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree:
[info] 'Project [*]
[info] Subquery key_value
[info] CassandraRelation TableDef(test,key_value,ArrayBuffer(ColumnDef(test,key_value,key,PartitionKeyColumn,IntType,false)),ArrayBuffer(ColumnDef(test,key_value,f1,ClusteringColumn(0),DoubleType,false), ColumnDef(test,key_value,f2,ClusteringColumn(1),DoubleType,false), ColumnDef(test,key_value,f3,ClusteringColumn(2),DoubleType,false), ColumnDef(test,key_value,f4,ClusteringColumn(3),VarCharType,false)),ArrayBuffer(ColumnDef(test,key_value,value,RegularColumn,VarCharType,false))), None
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
[info] at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
[info] at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
[info] ...
Exception in thread "Thread-5" java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.analysis.Star
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1$React.react(Framework.scala:945)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1.run(Framework.scala:934)
at java.lang.Thread.run(Thread.java:745)