Dealing with NaNs and Infinity

505 views
Skip to first unread message

Hugo Ferreira

unread,
Jan 26, 2015, 9:05:09 AM1/26/15
to spark-conn...@lists.datastax.com
Hello,

I am trying to select rows of data wherein the column value may or may knot exist. I figure I could use a NaN or Infinity and filter on those using
Spark SQL seeing as 1.2.0 seems to support this. So I do:

val cc = new CassandraSQLContext(TestSparkContext)
cc.setKeyspace("test")
// Does not work, no nulls exist
val rdd: SchemaRDD = cc.cassandraSql("SELECT * from key_value WHERE f2 < Infinity")

and get a class not found error (org.apache.spark.sql.catalyst.analysis.Star, see stack trace below). This doesn't seem to come from the Cassandra connector. However I would would just like to check is this is possible. I am using a version of the git master of the Cassandra connector + spark 1.2.0.

I set up Cassandra as follows:

CassandraConnector(TestSparkContext.config).withSessionDo { session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE IF NOT EXISTS test.key_value (key INT, value VARCHAR, f1 DOUBLE, f2 DOUBLE, f3 DOUBLE, f4 VARCHAR, PRIMARY KEY(key, f1, f2, f3, f4))")
session.execute("TRUNCATE test.key_value")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (1, 'first row' , 11.0, 12.0, 13.0, 'f4_1' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (2, 'second row' , 21.0, 22.0, 23.0, 'f4_2' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (3, 'third row' , 31.0, NaN, 33.0, 'f4_3' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (5, 'fith row' , 41.0, Infinity, 43.0, 'f4_4' )")
session.execute("INSERT INTO test.key_value(key, value, f1, f2, f3, f4) VALUES (7, 'seventh row', 51.0, 52.0, 53.0, '' )")
}

Can anyone comment on this?
TIA,
HF


[info] - should be able to select a subset of applicable features *** FAILED ***
[info] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree:
[info] 'Project [*]
[info] Subquery key_value
[info] CassandraRelation TableDef(test,key_value,ArrayBuffer(ColumnDef(test,key_value,key,PartitionKeyColumn,IntType,false)),ArrayBuffer(ColumnDef(test,key_value,f1,ClusteringColumn(0),DoubleType,false), ColumnDef(test,key_value,f2,ClusteringColumn(1),DoubleType,false), ColumnDef(test,key_value,f3,ClusteringColumn(2),DoubleType,false), ColumnDef(test,key_value,f4,ClusteringColumn(3),VarCharType,false)),ArrayBuffer(ColumnDef(test,key_value,value,RegularColumn,VarCharType,false))), None
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)
[info] at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
[info] at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
[info] at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
[info] ...
Exception in thread "Thread-5" java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.analysis.Star
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1$React.react(Framework.scala:945)
at org.scalatest.tools.Framework$ScalaTestRunner$Skeleton$1.run(Framework.scala:934)
at java.lang.Thread.run(Thread.java:745)

Helena Edelson

unread,
Jan 26, 2015, 9:19:15 AM1/26/15
to spark-conn...@lists.datastax.com
Hi Hugo,

I have seen this before when trying to add data that did not exist and the system added Nans and Infinities. However my error was that the right data was not getting in, so fixing getting the data correct fixed my issue. I don't know that Nan and Infinity values are supported. Will find out, and what you can do moving forward. Stay tuned.

Helena
@helenaedelson

Helena Edelson

unread,
Jan 26, 2015, 9:37:41 AM1/26/15
to spark-conn...@lists.datastax.com
Can you add the spark catalyst jar to your dependencies and retry?

- Helena

On Monday, January 26, 2015 at 9:05:09 AM UTC-5, Hugo Ferreira wrote:

Hugo Ferreira

unread,
Jan 26, 2015, 9:54:16 AM1/26/15
to spark-conn...@lists.datastax.com
Hi Helena,

Appreciate the feedback.

I already have this included.
I am working in the Scala IDE (Eclipse) and checked that
this class was among the dependencies. I even did a:

import org.apache.spark.sql.catalyst.analysis.Star

just for sanity checking.
Just in case this helps, I have added by build.sbt below.

TIA,
HF

name := """gnosis"""

version := "1.0-SNAPSHOT"

scalaVersion := "2.10.4"

EclipseKeys.withSource := true

lazy val root = project.in(file(".")).enablePlugins(PlayScala)

libraryDependencies ++= Seq(
// "com.microsoft.sqlserver" % "sqljdbc4" % "4.0",
"org.eclipse.jetty.orbit" % "javax.servlet" % "3.0.0.v201112011016",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.12.1" % "test",
"org.scalatestplus" %% "play" % "1.2.0" % "test",
"com.github.fommil.netlib" % "all" % "1.1.2",
"org.apache.hadoop" % "hadoop-client" % "2.4.0",
"org.apache.spark" % "spark-core_2.10" % "1.2.0",
"org.apache.spark" % "spark-streaming_2.10" % "1.2.0",
"org.apache.spark" % "spark-catalyst_2.10" % "1.2.0",
"org.apache.spark" % "spark-sql_2.10" % "1.2.0",
"org.apache.spark" % "spark-hive_2.10" % "1.2.0",
"org.apache.spark" % "spark-mllib_2.10" % "1.2.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.2-SNAPSHOT",
"com.datastax.spark" %% "spark-cassandra-connector-embedded" % "1.1.2-SNAPSHOT",
cache
)

Helena Edelson

unread,
Jan 26, 2015, 9:57:50 AM1/26/15
to spark-conn...@lists.datastax.com
Hi Greg,

I see one issue to start with: version inconsistencies:
In your build you are using spark version 1.2.0 (latest) yet you are using that with a 1.1.2 connector snapshot.

"org.apache.spark" % "spark-core_2.10" % "1.2.0",
"org.apache.spark" % "spark-streaming_2.10" % "1.2.0",
"org.apache.spark" % "spark-catalyst_2.10" % "1.2.0",
"org.apache.spark" % "spark-sql_2.10" % "1.2.0",
"org.apache.spark" % "spark-hive_2.10" % "1.2.0",
"org.apache.spark" % "spark-mllib_2.10" % "1.2.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.2-SNAPSHOT",
"com.datastax.spark" %% "spark-cassandra-connector-embedded" % "1.1.2-SNAPSHOT"

So first, use spark 1.2.0 with the connector 1.2.0-alpha1 so they are both using the same version of spark. And secondly, never use snapshots in dependencies, that is always bad news ;)

Let me know if that helps.

Helena
@helenaedelson

Hugo Ferreira

unread,
Jan 26, 2015, 10:55:58 AM1/26/15
to spark-conn...@lists.datastax.com
Hi Helena,

Unfortunately that did not work. I get the same error.
Just to be sure I cleaned my Ivy repository, updated the
build.sbt file, done a clean update, compiled, eclipse
and ran the test again (via SBT).

Please note that when I try with a standard value, this works.
Any suggestions?

Regards,
HF

Hugo Ferreira

unread,
Jan 28, 2015, 3:59:09 AM1/28/15
to spark-conn...@lists.datastax.com

I have alternate solutions. So this is not important.
Reply all
Reply to author
Forward
0 new messages