Is there a way to store the UUID type in the Spark

1829 views
Skip to first unread message

Yong Zhang

unread,
Oct 9, 2015, 1:49:25 PM10/9/15
to DataStax Spark Connector for Apache Cassandra
Hi,

In this case, I want to use Spark as an ETL engine to load the data from Cassandra, and save it into HDFS.

Here is the environment specified information:

Spark 1.3.1
Cassandra 2.1
HDFS/Hadoop 2.2

I am using the Cassandra Spark Connector 1.3.x, which I have no problem to query the C* data in the Spark. But I have a problem trying to save the data into HDFS, like below:

val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( "c_table" -> "table_name", "keyspace" -> "keyspace_name")
df: org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid, business_info_ids: array<uuid>, closed_date: timestamp, compliance_hold: boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type: string, deleted_date: timestamp, discount_info: map<string,string>, end_date: timestamp, insert_by: string, insert_time: timestamp, last_update_by: string, last_update_time: timestamp, name: string, parent_id: uuid, publish_date: timestamp, share_incentive: map<string,string>, start_date: timestamp, version: int]

scala> df.count
res12: Long = 757704

I can also dump the data output suing df.first, without any problem.

But when I try to save it:

scala> df.save("hdfs://location", "parquet")
java.lang.RuntimeException: Unsupported datatype UUIDType
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at org.apache.spark.sql.parquet.ParquetRelation2.<init>(newParquet.scala:391)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC.<init>(<console>:51)
at $iwC.<init>(<console>:53)
at <init>(<console>:55)
at .<init>(<console>:59)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

It looks like Spark doesn't know how to handle the UUID type, and as you can see, the UUID type existed in both top level column, and also in the nested level.

My question is, giving the version of all the components I current have, is there any option for me?

Thanks

Yong

Russell Spitzer

unread,
Oct 9, 2015, 1:58:42 PM10/9/15
to DataStax Spark Connector for Apache Cassandra
Yes but you won't be able to use Dataframes at the moment since they don't support the UUID type and we haven't put in a custom toString, fromString converter yet. If you use the scala api without dataframes it should be good. 

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Yong Zhang

unread,
Oct 9, 2015, 4:01:26 PM10/9/15
to DataStax Spark Connector for Apache Cassandra
Thanks.

val rdd = sc.cassandraTable("keyspace_name", "table_name")
rdd.saveAsTextFile()

did work.

So I have some further questions related to this.

The RDD coming back from the Cassandra connector is not a schema RDD, so the only immediate way for me is to saveAsTextFile().

If the DataFrame doesn't work for me in this case, but I do want to save the final output either in AVRO or Parquet format, so I can only think about the following ways:

1) Use rdd.saveAsNewAPIHadoopFile() using Avro OutputFormat to save output as AVRO format.
2) Save as Text format, then parse it. But this text format could be changed, depending on Cassandra connector's implementation.

So the real question is that if the RDD coming back without schema, and I cannot use DataFrame, then what is a easy way to understand the structure of the data?

Thanks

Russell Spitzer

unread,
Oct 9, 2015, 4:21:28 PM10/9/15
to DataStax Spark Connector for Apache Cassandra
You don't need to save as text file. If your imports are correct you should be able to call saveToCassandra on *any* RDD regardless of schema state. (Whether it maps to your underlying C* table is a different question.) If you want to save to AVRO or Parquet your best bet is to transform that RDD into a Dataframe (you may have to manually convert the uuids into something so Dataframes can understand it) using toDF(). 

So perhaps
case class OutputType ( usedToBeUUID: String , .....)
rdd.map( row -> OutputType(row.getUUID().toString, ....) ).toDf

Your other option if the uuid information is not important is to simply not select that column when making the dataframe. 

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Yong Zhang

unread,
Oct 15, 2015, 2:22:08 PM10/15/15
to DataStax Spark Connector for Apache Cassandra
Then can I use a custom Java class in this case, instead of "case class".

I already have a Java class, generated from the AVRO schema, so I tried to do this:

scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala>val outputRdd = rdd.map(row => new Coupon(row.getLong("id1", row.getUUID("id2").toString))
outputRdd: org.apache.spark.rdd.RDD[Coupon] = MapPartitionsRDD[4] at map at <console>:30
scala>import sqlContext.implicits._
scala> val df = outputRdd.toDF
<console>:32: error: value toDF is not a member of org.apache.spark.rdd.RDD[Coupon]
val outputDF = outputRdd.toDF

The above error is gone if I use a "case class". But I wonder if I can reuse my own Java class, as the structure is complex.

Thanks

Reply all
Reply to author
Forward
0 new messages