Java datastax spark read with set<UDT> and a map<UDT, UDT> example

537 views
Skip to first unread message

Joe Payne

unread,
Aug 1, 2016, 12:14:12 PM8/1/16
to DataStax Spark Connector for Apache Cassandra
I'm very new to spark and trying to learn.

I'm trying to read a table that has a set and a map of user defined types. I had to use a CassandraTableScanJavaRdd to intercept the call and use map() to do a row mapping. I haven't been able to figure out how to convert the map or set. I see you can pass in a TypeConverter, but I am not sure how to implement it or if that is the way it should be done.

row.getMap("column_name", TypeConverter, TypeConverter);

If I should use a type converter, does anyone have an example for a UDT in Java? Or if you can tell me the best way to convert the entire table to my pojo.

I'm using datastax spark connector 1.5.1.

Thank you,
Joe

Russell Spitzer

unread,
Aug 3, 2016, 12:45:32 PM8/3/16
to DataStax Spark Connector for Apache Cassandra
The easiest thing is to just use scala (jk)

Make an mbean for your row, it should have appropriate fields and sub classes which match all the fields you have in your udt. Them use the mapRowTo helper.

Like in the class

JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", mapRowTo(Person.class))

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Joe Payne

unread,
Aug 3, 2016, 3:30:55 PM8/3/16
to DataStax Spark Connector for Apache Cassandra
I was told I needed to change my spark connector to match dse 4.8 so I switched to 1.2.1. I also went through and renamed my bean to match the naming of the table fields exactly except camel case. Now when I do an rdd.first() I get an exception:
java.lang.AbstractMethodError: com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.close()V
.
.
.

I'm not sure what would cause that. My SparkConf does .setMaster("local") and i do the spark.cassandra.connection.host set to the cassandra node.

On Wednesday, August 3, 2016 at 12:45:32 PM UTC-4, Russell Spitzer wrote:
> The easiest thing is to just use scala (jk)
>
> Make an mbean for your row, it should have appropriate fields and sub classes which match all the fields you have in your udt. Them use the mapRowTo helper.
>
> Like in the class
>
>
> JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", mapRowTo(Person.class))
>
>
> On Mon, Aug 1, 2016 at 9:14 AM Joe Payne <joe....@gmail.com> wrote:
> I'm very new to spark and trying to learn.
>
>
>
> I'm trying to read a table that has a set and a map of user defined types.  I had to use a CassandraTableScanJavaRdd to intercept the call and use map() to do a row mapping.  I haven't been able to figure out how to convert the map or set.  I see you can pass in a TypeConverter, but I am not sure how to implement it or if that is the way it should be done.
>
>
>
> row.getMap("column_name", TypeConverter, TypeConverter);
>
>
>
> If I should use a type converter, does anyone have an example for a UDT in Java?  Or if you can tell me the best way to convert the entire table to my pojo.
>
>
>
> I'm using datastax spark connector 1.5.1.
>
>
>
> Thank you,
>
> Joe
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

Russell Spitzer

unread,
Aug 3, 2016, 5:40:17 PM8/3/16
to DataStax Spark Connector for Apache Cassandra
The missing exception comes from a jar mismatch.

DSE 4.8 uses spark 1.4 (scala 2.10 build) so you should be using the Spark Cassandra Connector 1.4.x

Joe Payne

unread,
Aug 4, 2016, 9:07:00 AM8/4/16
to DataStax Spark Connector for Apache Cassandra
Actually meant to say we are on 4.7. I'm using spark connector 1.2.1
Does it matter about the other dependencies i'm forced to add such as the jetty ones?

Joe Payne

unread,
Aug 4, 2016, 9:33:04 AM8/4/16
to DataStax Spark Connector for Apache Cassandra
Here is my current pom dependencies.

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>LATEST</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.2.0</version>
</dependency>
<!-- Spark Java API -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.6.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>7.6.0.RC5</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>7.6.0.RC5</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>7.6.0.RC5</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>7.6.0.RC5</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>7.6.0.RC5</version>
</dependency>
</dependencies>

Joe Payne

unread,
Aug 4, 2016, 1:25:42 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
I changed my drivers to match this table for 1.2
https://github.com/datastax/spark-cassandra-connector
So I now get this exception using rdd.first(). I'm using the mapRowTo(Document.class). Its complaining when it gets to the udt inside my model class that i'm using in the mapRowTo.

Caused by: java.lang.IllegalArgumentException: Unsupported type: com.test.cassandra.model.Format
at com.datastax.spark.connector.types.TypeConverter$.forCollectionType(TypeConverter.scala:728) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:740) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.types.TypeConverter$.forCollectionType(TypeConverter.scala:713) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.types.TypeConverter$.forType(TypeConverter.scala:740) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader$$anonfun$3.apply(ClassBasedRowReader.scala:45) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader$$anonfun$3.apply(ClassBasedRowReader.scala:45) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) ~[scala-library-2.10.4.jar:na]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:na]
at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[scala-library-2.10.4.jar:na]
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:45) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:147) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:145) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:46) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:58) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:58) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:163) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:58) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:117) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:na]
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.rdd.RDD.take(RDD.scala:1077) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraRDD.take(CassandraRDD.scala:118) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at com.datastax.spark.connector.rdd.CassandraRDD.take(CassandraRDD.scala:119) ~[spark-cassandra-connector_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.rdd.RDD.first(RDD.scala:1110) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:437) ~[spark-core_2.10-1.2.1.jar:1.2.1]
at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32) ~[spark-core_2.10-1.2.1.jar:1.2.1]

Russell Spitzer

unread,
Aug 4, 2016, 2:45:43 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
That is not supported in SCC 1.2. See https://datastax-oss.atlassian.net/browse/SPARKC-4, you will have to use the CassandraRow objects if you are on SCC1.2



Joe Payne

unread,
Aug 4, 2016, 2:51:53 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
Are UDT not supported at all in 1.2? Or do I need to write my own RowReader with a TypeConverter. I haven't had any luck find info about how to write my own TypeConverter.

Russell Spitzer

unread,
Aug 4, 2016, 3:03:08 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
Udts are supported, but type conversion to classes other than CassandraRow are not supported out of the box. Writing your own type converter is not quite easy, you are most likely to going to have an easier time just manipulating the CassandraRow directly.

If you really want your own type converter I would suggest just writing a RowReader for your class specifically.

See https://github.com/datastax/spark-cassandra-connector/blob/v1.2.6/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReader.scala

You would write something like

RowReaderToMyClass extends RowReader[MyClass] {
   def read(row: Row, columnNames ){ 
      new MyClass(row.get(col) .... )
}

}


On Thu, Aug 4, 2016 at 11:51 AM Joe Payne <joe....@gmail.com> wrote:
Are UDT not supported at all in 1.2?  Or do I need to write my own RowReader with a TypeConverter.  I haven't had any luck find info about how to write my own TypeConverter.

Joe Payne

unread,
Aug 4, 2016, 3:16:03 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
So if we upgraded to 4.8 or 5.0 then I could just use the mapRowTo class without a problem?

Russell Spitzer

unread,
Aug 4, 2016, 3:20:23 PM8/4/16
to DataStax Spark Connector for Apache Cassandra
That should work
Reply all
Reply to author
Forward
0 new messages