In this case, I want to use Spark as an ETL engine to load the data from Cassandra, and save it into HDFS.
Here is the environment specified information:
Spark 1.3.1
Cassandra 2.1
HDFS/Hadoop 2.2
I am using the Cassandra Spark Connector 1.3.x, which I have no problem to query the C* data in the Spark. But I have a problem trying to save the data into HDFS, like below:
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( "c_table" -> "table_name", "keyspace" -> "keyspace_name")
df: org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid, business_info_ids: array<uuid>, closed_date: timestamp, compliance_hold: boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type: string, deleted_date: timestamp, discount_info: map<string,string>, end_date: timestamp, insert_by: string, insert_time: timestamp, last_update_by: string, last_update_time: timestamp, name: string, parent_id: uuid, publish_date: timestamp, share_incentive: map<string,string>, start_date: timestamp, version: int]
scala> df.count
res12: Long = 757704
I can also dump the data output suing df.first, without any problem.
But when I try to save it:
scala> df.save("hdfs://location", "parquet")
java.lang.RuntimeException: Unsupported datatype UUIDType
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at org.apache.spark.sql.parquet.ParquetRelation2.<init>(newParquet.scala:391)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC.<init>(<console>:51)
at $iwC.<init>(<console>:53)
at <init>(<console>:55)
at .<init>(<console>:59)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It looks like Spark doesn't know how to handle the UUID type, and as you can see, the UUID type existed in both top level column, and also in the nested level.
My question is, giving the version of all the components I current have, is there any option for me?
Thanks
Yong
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
val rdd = sc.cassandraTable("keyspace_name", "table_name")
rdd.saveAsTextFile()
did work.
So I have some further questions related to this.
The RDD coming back from the Cassandra connector is not a schema RDD, so the only immediate way for me is to saveAsTextFile().
If the DataFrame doesn't work for me in this case, but I do want to save the final output either in AVRO or Parquet format, so I can only think about the following ways:
1) Use rdd.saveAsNewAPIHadoopFile() using Avro OutputFormat to save output as AVRO format.
2) Save as Text format, then parse it. But this text format could be changed, depending on Cassandra connector's implementation.
So the real question is that if the RDD coming back without schema, and I cannot use DataFrame, then what is a easy way to understand the structure of the data?
Thanks
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I already have a Java class, generated from the AVRO schema, so I tried to do this:
scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")
rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala>val outputRdd = rdd.map(row => new Coupon(row.getLong("id1", row.getUUID("id2").toString))
outputRdd: org.apache.spark.rdd.RDD[Coupon] = MapPartitionsRDD[4] at map at <console>:30
scala>import sqlContext.implicits._
scala> val df = outputRdd.toDF
<console>:32: error: value toDF is not a member of org.apache.spark.rdd.RDD[Coupon]
val outputDF = outputRdd.toDF
The above error is gone if I use a "case class". But I wonder if I can reuse my own Java class, as the structure is complex.
Thanks