Spark 2.3 issues w/ java.lang.AbstractMethodError: .. parseFrom(Lcom/google/protobuf/CodedInputStrea

314 views
Skip to first unread message

bo blanton

unread,
Mar 21, 2018, 12:04:41 PM3/21/18
to ScalaPB
Hi, 

I'm trying to get the scala protos to work in a spark env.  The protos are not generated by scala, but i have access to the raw SequenceFiles for them.

I was interested in using this lib to allow the nice "protoToDataFrame" niceness that makes complex messages much easier to deal with.  However i'm pretty much getting a slew of java.lang.AbstractMethodError errors whenever i use the Scala objects (the java objects work fine, but don't have the nice implicit dataframe converters of course).  

version info:
  spark: 2.3 (tried 2.2 as well)
  scalapb: 0.70 (have tried 0.7.1 and 0.6.7 as well)
  scala: 2.11.12 (have tried 2.12.5 as well)
  protobuf lib is shaded in the assembly jar of generated objects (otherwise nothing works)
  

For example.

import com.thing.message.{Message => ScalaMessage}
import com.thing.{Message => JavaMessage}

val byteBlob: Array[byte] = xxxx

// works just fine
val jdecode = JavaMessage.parseFrom(byteBlob)

// also seems to work
val sdecode = ScalaMessage.fromJavaProto(jdecode)

// fails
ScalaMessage.parseFrom(byteBlob)

```
Caused by: java.lang.AbstractMethodError: com.thing.message.Message$.parseFrom(Lcom/google/protobuf/CodedInputStream;)Lscalapb/GeneratedMessage;
  at scalapb.GeneratedMessageCompanion$class.parseFrom(GeneratedMessageCompanion.scala:204)
  at com.asapp.schemas.generic.message.Message$.parseFrom(Message.scala:101)
  at $anonfun$1.apply(<console>:38)
  at $anonfun$1.apply(<console>:38)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  at scala.collection.AbstractIterator.to(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
  at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
```

val byteRDD = .. some rdd of bytes ...

// fails w/ the same message
val objRDD = byteRDD.map(r => AsappMessage.parseFrom(r._1)).take(1)

// does ok 
val objRDD = byteRDD.map(r => AsappMessage. fromJavaProto(JavaMessage.parseFrom(r._1))).take(1)

// fails
spark.sqlContext.protoToDataFrame(objRDD)

```
java.lang.AbstractMethodError: com.thing.message.Message$.javaDescriptor()Lcom/google/protobuf/Descriptors$Descriptor;
  at scalapb.spark.ProtoSQL$.schemaFor(ProtoSQL.scala:26)
  at scalapb.spark.ProtoSQL$.protoToDataFrame(ProtoSQL.scala:15)
  at scalapb.spark.ProtoSQL$.protoToDataFrame(ProtoSQL.scala:20)
  at scalapb.spark.package$ProtoSQLContext$.protoToDataFrame$extension(package.scala:11)
...
```

// tried a more manual approach to get the Struct Spark schema but run into the "shading" issue''

import org.apache.spark.sql.types._
import collection.JavaConverters._
import scalapb.spark.ProtoSQL

StructType(JavaMessage.getDescriptor.getFields.asScala.map(ProtoSQL.structFieldFor))

```
<console>:43: error: type mismatch;
 found   : com.google.protobuf.Descriptors.FieldDescriptor => org.apache.spark.sql.types.StructField
 required: shadeproto.Descriptors.FieldDescriptor => ?
       StructType(JavaMessage.getDescriptor.getFields.asScala.map(ProtoSQL.structFieldFor))
```

Any help or thoughts that i may be missing would be most helpful.

Much thanks

bo

Nadav Samet

unread,
Mar 21, 2018, 12:31:07 PM3/21/18
to bo blanton, ScalaPB
Thanks for reporting. Can you provide a minimal project that demonstrates this problem with a test or a main app, that I can simply run and get this exception? -Nadav

--
You received this message because you are subscribed to the Google Groups "ScalaPB" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalapb+unsubscribe@googlegroups.com.
To post to this group, send email to sca...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scalapb/c3799db8-04de-4ec1-a3ac-49f88d577ac5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
-Nadav

bo blanton

unread,
Mar 23, 2018, 12:23:24 AM3/23/18
to Nadav Samet, ScalaPB
Hi

The generation was performed from the below (no other code then is there, just the .protos)

```

// build.sbt

scalaVersion := "2.11.12"

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.15")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.0"

libraryDependencies ++= Seq(
    "com.google.protobuf" % "protobuf-java" % scalapb.compiler.Version.protobufVersion % "protobuf",
    "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
    "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
    "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion
)

PB.targets in Compile := Seq(
    scalapb.gen(javaConversions=true) -> (sourceManaged in Compile).value
    PB.gens.java -> (sourceManaged in Compile).value
)


// shade out the proto as for spark and other tings the proto is older then this one by alot
assemblyShadeRules in assembly := Seq(
ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll
),

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

```

This was all done in a spark-shell, so it’s not really a “repo” to speak of, but basically the steps of the form I laid out below with some real data 

```
import spark.implicits._

// need to set this to false before loading data
spark.conf.set("spark.sql.parquet.binaryAsString", "false")

// needed to allow RDD serialization of "intesting" java objects
// Kryo, while nice does not like complex set types (aka protobufs)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
sc.getConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val theFile="/path/to/hadoopsequence/file"


import com.myprotos.message.{Message => ScalaMessage}
import com.myprotos.{Message => JavaMessage}


import org.apache.hadoop.io.{BytesWritable, LongWritable}

val sdf=spark.sparkContext.sequenceFile("file://" theFile, classOf[LongWritable], classOf[BytesWritable])

val oneMsg = sdf.take(1)(0)._2.copyBytes
val amsg = ScalaMessage.fromJavaProto(JavaMessage.parseFrom(oneMsg))


import scalapb.spark._

val ddf = sdf.map(r => ScalaMessage.fromJavaProto(JavaMessage.parseFrom(r._2.copyBytes)))

ddf.take(1)

// these fail

val ddf = sdf.map(r => ScalaMessage.parseFrom(r._2.copyBytes))


```
signature.asc

Nadav Samet

unread,
Mar 23, 2018, 1:36:28 PM3/23/18
to bo blanton, ScalaPB
Hi Bo, I was not able to reproduce this problem based on what you sent. Resolving issues like this is much easier when you provide a repository and clear instructions on how to reproduce. The code you posted above doesn't reference sbt-assembly, and seem to have project/plugins.sbt intertwined with build.sbt, doesn't contain the proto definition as well as the sequence file you are referring to. Is the Java serialization and the configuration settings relevant? Does the problem still happen when you remove them? It would also be helpful to know how you start spark-shell.

Here is what I tried:

I prepared this repo based on what you sent https://github.com/thesamet/tmp-scalapb-spark-assembly and ran "sbt assembly", then I ran 

~/dev/spark-2.2.1-bin-hadoop2.7/bin/spark-shell --jars target/scala-2.11/sss-assembly-0.1-SNAPSHOT.jar

followed by:

sc.parallelize(Seq(Array.empty[Byte])).map(com.myprotos.message.Message.parseFrom).collect

which completed successfully:

res0: Array[com.myprotos.message.Message] = Array(Message())

Based on the repository I shared, can you explain how to trigger this issue? If more code needs to be added to this repo, feel free to send a PR or your own copy of it.

-Nadav

--
-Nadav

bo blanton

unread,
Mar 23, 2018, 1:42:05 PM3/23/18
to Nadav Samet, ScalaPB
Hi

Sorry about that, yea I forgot the assembly plugin line .. 

If I remove the assembly and shading, nothing works (spark uses, as I’m sure you are aware) a very old proto lib, and our protos are version 3

I can try to find some time to see if I cannot reproduce things in a not-so-sensitive fashion from your base repo (thanks for creating that)

 The spark we run is mostly “out of the box” but with more security and other drivers things that don’t come standard.

B
signature.asc

Caio Quirino

unread,
May 7, 2018, 10:38:26 AM5/7/18
to ScalaPB
It's because your Spark application uses a different Protobuf version. Try to force the protobuf version to 2.5.0 in SBT:
libraryDependencies += "com.google.protobuf" % "protobuf-java" % "2.5.0" force()
B

To unsubscribe from this group and stop receiving emails from it, send an email to scalapb+u...@googlegroups.com.

To post to this group, send email to sca...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scalapb/c3799db8-04de-4ec1-a3ac-49f88d577ac5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
-Nadav




--
-Nadav

Reply all
Reply to author
Forward
0 new messages