When I attempt to access this table with the below code, it generates and exception (see below). It is related to the column "relations" on the "objects" table which is a set<frozen relation>. If I remove this it works fine.
Question, is will this be supported in future release and be accessible via SparkSQL?
Thanks for the input.
-Todd
DROP KEYSPACE IF EXISTS udt_test;
CREATE KEYSPACE udt_test
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE udt_test;
CREATE TYPE IF NOT EXISTS category_metadata (
category_id text,
metric_descriptors set <text>
);
CREATE TYPE IF NOT EXISTS object_metadata (
name text,
category_metadata frozen<category_metadata>,
bucket_size int
);
CREATE TYPE IF NOT EXISTS relation (
type text,
object_type text,
related_to text,
obj_id text
);
CREATE TABLE IF NOT EXISTS objects (
obj_id text,
metadata frozen<object_metadata>,
relations set<frozen<relation>>,
ts timestamp,
PRIMARY KEY(obj_id)
);
INSERT INTO objects (obj_id, ts, metadata) values ('123', '2015-06-16 15:53:23-0400',
{ name: 'foo', category_metadata: {category_id: 'thermostat', metric_descriptors: {}}, bucket_size: 0}
);
Using the spark-cassandra-connector version 1.4.0-M1 or 1.3.0-M1 with the corresponding spark version (1.4.0, 1.3.1) when I attempt to access this with the DataFrame api:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( "table" -> "objects", "keyspace" -> "udt_test"))
I get the following exception:
15/06/17 13:49:30 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
15/06/17 13:49:30 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
java.util.NoSuchElementException: key not found: UserDefinedType(relation,Vector(UDTFieldDef(type,VarCharType), UDTFieldDef(object_type,VarCharType), UDTFieldDef(related_to,VarCharType), UDTFieldDef(obj_id,VarCharType)))
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.cassandra.DataTypeConverter$.catalystDataType(DataTypeConverter.scala:44)
at org.apache.spark.sql.cassandra.DataTypeConverter$.toStructField(DataTypeConverter.scala:57)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:57)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:57)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$anonfun$schema$1.apply(CassandraSourceRelation.scala:57)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$anonfun$schema$1.apply(CassandraSourceRelation.scala:57)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.schema(CassandraSourceRelation.scala:57)
at org.apache.spark.sql.sources.LogicalRelation.<init>(LogicalRelation.scala:30)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:120)
at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1242)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC.<init>(<console>:49)
at $iwC.<init>(<console>:51)
at <init>(<console>:53)
at .<init>(<console>:57)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.util.NoSuchElementException: key not found: UserDefinedType(mytuple,Vector(UDTFieldDef(key,VarCharType), UDTFieldDef(value,VarCharType)))
It looks like a bug, which version of Spark Cassandra connector do you test? Please open a ticket for this bug
Alex
Thanks