Base64 encoded non-parquet streaming ingestion

218 views
Skip to first unread message

Sunita Koppar

unread,
May 30, 2016, 6:35:04 PM5/30/16
to ScalaPB
HI Nadav,

Continuing the discussion on the forum. I do not have a valid log file for the sample project I created, but attaching the code. Probably I am doing something wrong in the way I am generating the sample log. I will recheck that. The error I get with the actual project indicates an error due to the file not having the magic number- details below (basically not being a parquet file) 
If you can spot an issue in the way I am coding, you might be able to help. Except the file and the schema, its basically the same code. The sample log is not parquet encoded. I just want to know if parquet is a mandatory encoding for this library. The reason for the question is, elephant bird seems to support non-parquet files while https://github.com/saurfang/sparksql-protobuf doesnt seem to support non-parquet.

FYR,

The error on the actual project is as below:
java.io.IOException: Could not read footer: java.lang.RuntimeException: file:/Users/skoppar/workspace/pyspark-beacon/stream/allproto.log is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [55, 73, 67, 10]
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:248)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24.apply(ParquetRelation.scala:812)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$24.apply(ParquetRelation.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: file:/Users/skoppar/workspace/pyspark-beacon/stream/allproto.log is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [55, 73, 67, 10]

regards
Sunita
TestSparkProto.tgz

Nadav Samet

unread,
May 30, 2016, 8:02:21 PM5/30/16
to Sunita Koppar, ScalaPB
sparksql-scalapb adds conversion to/from Parquet. However, you don't have to use that library with SparkSQL. Your other option is to use JSON with SparkSQL. You can convert protobufs to/from JSON using scalapb-json4s.

--
You received this message because you are subscribed to the Google Groups "ScalaPB" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalapb+u...@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scalapb/bbe9d2cd-2c35-4f86-84b4-d5bb3908baed%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
-Nadav

Sunita Koppar

unread,
May 31, 2016, 10:00:01 AM5/31/16
to ScalaPB, sunita...@verizondigitalmedia.com
Can you elaborate on the usage?

val r: String = JsonFormat.toJsonString(myProto) -> you mean every message?

Is there a way to use it in the library fashion:
spark.read.stream("file:///xxx.log").map(JsonFormat.protoToWriter) //Or toJsonString - neither works

regards
Sunita

Sunita Koppar

unread,
May 31, 2016, 4:00:23 PM5/31/16
to ScalaPB, sunita...@verizondigitalmedia.com
Update:

I changed the buildScan definition to parse as JSON:

override def buildScan(): RDD[Row] = {
sqlContext.sparkContext.textFile(location).map{line => JsonFormat.toJsonString[myproto](myproto.parseFrom(line.getBytes))}
sqlContext.read.load(location).rdd
}

This throws the below error at runtime:
Exception in thread "main" java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)

Going by this - https://github.com/json4s/json4s/issues/320 I cloned scalapb-json4s and changed the json4s version to 
"org.json4s" %% "json4s-jackson" % "3.2.10"

However, scalapb-json4s doesnt compile with it:

[info] Compiling 1 Scala source to /Users/skoppar/workspace/scalapb-json4s/target/scala-2.11/classes...
[error] /Users/skoppar/workspace/scalapb-json4s/src/main/scala/com/trueaccord/scalapb/json/JsonFormat.scala:59: not found: value JLong
[error]       case (JavaType.LONG, JLong(x)) => x.toLong
[error]                            ^
[error] /Users/skoppar/workspace/scalapb-json4s/src/main/scala/com/trueaccord/scalapb/json/JsonFormat.scala:110: not found: value JLong
[error]     case JavaType.LONG => JLong(value.asInstanceOf[Long])
[error]                           ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed

regards
Sunita
Reply all
Reply to author
Forward
0 new messages