3.2.1-SNAPSHOT changes to SparkGraphComputer serialization

222 views
Skip to first unread message

Dylan Bethune-Waddell

unread,
Jun 15, 2016, 1:04:02 PM6/15/16
to Gremlin-users
I run into this error when attempting to use the new spark.kryo.registrator approach to serialization now present in master, whereas with GryoSerializer I do not:

 java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.util.collection.CompactBuffer[]
Note: To register this class use: kryo.register(org.apache.spark.util.collection.CompactBuffer[].class);
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)

The relevant (I think) config looks like this:
# spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
# spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator
spark.kryo.registrationRequired=true

I had thought that "gremlin.io.registry" might be the way to go about registering CompactBuffer as it is in GryoMapper, but I'm unsure of how to proceed after reading over the PR and checking the source so I thought I would ask. Is this intended behaviour and is there some way to get CompactBuffer registered with the new serialization framework?

Thanks,
Dylan

Marko Rodriguez

unread,
Jun 15, 2016, 1:55:40 PM6/15/16
to gremli...@googlegroups.com, Daniel LaRocque
Hello,

Hm. Dan LaRocque (cc:d) wrote most of that code and I believe that because this is not going to be made public until 3.3.0, the KryoShimService has low “priority” in the ClassLoader. To force it to load:

System.setProperty("gremlin.io.kryoShimService", UnshadedKryoShimService.class.getCanonicalName());
KryoShimServiceLoader.load(true);
At least, at looking at the test suite,t hat is how its done:


Tell me how it goes,
Marko.
--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/99615c55-f12e-4112-b891-99d36d6c5c29%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dylan Bethune-Waddell

unread,
Jun 16, 2016, 11:55:30 PM6/16/16
to Gremlin-users, dan.la...@datastax.com
Hi Marko,

I changed UnshadedKryoShimService.getPriority() to return 0 which ties it with HadoopPoolShimService, and this warning from Spark confirms that UnshadedKryoShimService was picked up for the Spark job:

16:44:57 WARN  org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader  - KryoShimService implementations org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService@3ff49aca and org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService@5bf7bad5 are tied with priority value 0.  Preferring org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService to the other because it has
a lexicographically greater classname.  Consider setting the system property "gremlin.io.kryoShimService" instead of relying on priority tie-breaking.

However, the behavior was the same and at DEBUG the worker logs still did not show that GryoRegistrator registered CompactBuffer at all. I think what's going on is that it should be manually added as VertexWritable and others are in GryoRegistrator#L188 - probably along with some of the other Spark internal classes you wrote serializers for / registered in GryoSerializer#L89 that did not make it to GryoRegistrator. Another issue comes up in trying to add it in though since the serializer is written in the groovy part of the spark-gremlin package (maybe why it was missed?), and when I try to implements SerializerShim<CompactBuffer<T>> and @Override the methods for that interface in the groovy serializer I get complaints when trying to build it:

[ERROR] Failed to execute goal org.codehaus.gmavenplus:gmavenplus-plugin:1.2:compile (default) on project spark-gremlin: Error occurred while calling a method on a Groovy class from classpath. InvocationTargetException: startup failed:
[ERROR] /fs1/home/dbethune/pancure/tp321/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy: 34: Can't have an abstract method in a non-abstract class. The class 'org.apache.tinkerpop.gremlin.spark.structure.io.gryo.CompactBufferSerializer' must be declared abstract or the method 'void write(org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim, org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim, java.lang.Object)' must be implemented.
[ERROR] @ line 34, column 1.
[ERROR] public final class CompactBufferSerializer<T> implements SerializerShim<CompactBuffer<T>> {
[ERROR] ^
[ERROR]
[ERROR] 1 error
[ERROR] -> [Help 1]

I started down a few paths and haven't gotten a build through on any so far:
1) In light of GROOVY-6617 I tried putting fully qualified type declarations in a few places in CompactBufferSerializer.groovy and compiling against a fresh groovy build from master.
2) Updated the gmavenplus plugin version to 1.5 after noting all the issues Google was listing with joint compilation of Groovy/Java code that seemed similar. That resulted in new complaints about both GryoDeserializationStream.groovy and CompactBufferSerializer.groovy --

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project spark-gremlin: Compilation failure: Compilation failure:
[ERROR] /fs1/home/dbethune/pancure/tp321/spark-gremlin/target/generated-sources/groovy-stubs/main/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.java:[10,14] org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoDeserializationStream is not abstract and does not override abstract method
 <T>readObject(scala.reflect.ClassTag<T>) in org.apache.spark.serializer.DeserializationStream
[ERROR] /fs1/home/dbethune/pancure/tp321/spark-gremlin/target/generated-sources/groovy-stubs/main/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoDeserializationStream.java:[21,29] name clash: <T>readObject(scala.reflect.ClassTag) in org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoDeserializationStream
and <T>readObject(scala.reflect.ClassTag<T>) in org.apache.spark.serializer.DeserializationStream have the same erasure, yet neither overrides the other
[ERROR] /fs1/home/dbethune/pancure/tp321/spark-gremlin/target/generated-sources/groovy-stubs/main/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.java:[10,14] org.apache.tinkerpop.gremlin.spark.structure.io.gryo.CompactBufferSerializer is not abstract and does not override abstract method read(org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim,org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim,java.lang.Class) in org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim
[ERROR] /fs1/home/dbethune/pancure/tp321/spark-gremlin/target/generated-sources/groovy-stubs/main/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.java:[20,133] name clash: <I>read(org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim,org.apache.tinkerpop.gremlin.structure.io.gryo.k
ryoshim.InputShim,java.lang.Class) in org.apache.tinkerpop.gremlin.spark.structure.io.gryo.CompactBufferSerializer and <I>read(org.apache.tinkerpop.gremlin.struc
ture.io.gryo.kryoshim.KryoShim<I,?>,I,java.lang.Class<T>) in org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim have the same erasure, yet neither overrides the other

3) More or less copy/pasted to the java part of the package, which didn't work immediately as I just didn't know what kind of groovy magic you were employing in the first place so that I could re-write the Java equivalent.

Hopefully something in there you guys can use - if I make any real progress on this I will post a JIRA and submit a PR, but I thought that since you and Dan are so much more familiar with this stuff (and much better devs in general) this might be a snap for you to correct and much longer with less assurances for me to get there. I am willing, though.

Hope this helps,
Dylan

Marko Rodriguez

unread,
Jun 17, 2016, 9:39:26 AM6/17/16
to gremli...@googlegroups.com, dan.la...@datastax.com
Hi Dylan,

I ping’d Dan directly to see if he has any insight. Apologies, I simply don’t understand enough about this body of code to provide an answer.

Marko.

Jason Plurad

unread,
Jun 17, 2016, 4:07:20 PM6/17/16
to Gremlin-users, dan.la...@datastax.com
" I believe that because this is not going to be made public until 3.3.0"

Should we have a tp32 branch without this feature?

Seems if this body of work is experimental and not thoroughly documented, it should be separated.

-- Jason

Dan LaRocque

unread,
Jun 18, 2016, 2:23:29 AM6/18/16
to gremli...@googlegroups.com
Hi Dylan,
 
Thanks for trying this out.  I'm breaking this message into some points.  Here's the summary:
 
1. GryoRegistrator needs to copy over several registrations from GryoSerializer's constructor that I thought were safe to drop.  Clearly, some of these are still required.   This is a bug -- when it is fixed, you should not have to register CompactBuffer[] in your IoRegistry.
2. If you have custom types (besides CompactBuffer[]), consider spark.serializer=IoRegistryAwareKryoSerializer
3. I can't tell what's going on with the compile errors and would like to see the source you used before commenting on those
 
Here's the longer version:
 
1. GryoSerializer registered a slew of scala runtime and Spark types.  I deliberately dropped all of these in GryoRegistrator, thinking that KryoSerializer registered all scala runtime or Spark types that TP jobs wolud need, so that GryoRegistrator could just carry over every TinkerPop registration and function equivalently.   I was wrong.  For instance, GryoRegistrator registers both CompactBuffer and CompactBuffer[].  CompactBuffer is registered in KryoSerializer, but CompactBuffer[] (the array type) is not.   I think this means that TinkerPop is passing around CompactBuffer[] in the course of running jobs (not sure where/why), whereas Spark does not necessarily need CompactBuffer[] serialization for its own internals.
 
Going through each line of GryoSerializer's constructor that registers a scala runtime type or Spark type and comparing against https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala, I found the following:
 
Not registered anywhere:
 
CompactBuffer[]:  NOT REGISTERED (should be registered in GryoRegistrator)
BoxedUnit: (probably) NOT REGISTERED (should be registered in GryoRegistrator)
 
Registered somewhere:
 
Tuple2, Tuple3: registered in KS (via KS calling chill's AllScalaRegistrar which calls chill's ScalaTupleSerialization which does the actual registration)
Tuple2[], Tuple3[]: registered in KS
CompactBuffer: registered in KS
CompressedMapStatus: registered in KS
BlockManagerId: registered in KS
HighlyCompressedMapStatus: registered in KS
HttpBroadcast: registered in KS
PythonBroadcast: registered in KS
scala.reflect.ClassTag$$anon$1: registered in GryoRegistrator (only known to come up testing though)
scala.reflect.ManifestFactory$$anon$1: same as last line
WrappedArray.ofRef: registered in GryoRegistrator
 
I'm going to make a PR that adds CompactBuffer[] and probably BoxedUnit to GryoRegistrator.  I may try to see whether BoxedUnit just eluded my source reading by starting up a test Spark environment and finding a way to do something like <spark's KryoSerializer>.newKryo().getClassResolver().getRegistration(BoxedUnit.class).
 
2.  IoRegistryAwareKryoSerializer is a subclass of KryoSerializer that looks for a gremlin.io.registry in the SparkConf and applies it if found.  The reason this could not be done solely through spark.kryo.registrator is that spark.kryo.registrator does not have access to the job configuration, whereas the spark.serializer does, and in the presence of custom types, the config must be available at or before the point when serialization first occurs (because of gremlin.io.registry).  This is probably the spark.serializer that you'll want to use under the new serialization setup if you have an IoRegistry.
 
3. Maybe I just overlooked one, but I didn't see a link to your source.  GROOVY-6617 is an attention grabber, especially since TP uses 2.4.5, but I don't know whether that's actually causing the pasted error.  I don't want to get into speculation about what's causing those compile errors without seeing exactly what went into the compiler, in other words.
 
thanks,
Dan

Dylan Bethune-Waddell

unread,
Jun 27, 2016, 6:13:58 PM6/27/16
to Gremlin-users
Hi Dan,

>> Thanks for trying this out.

No problem! I already see this as being a good change architecturally based on the PR that was submitted to the Titan repo recently upgrading Lucene/ES and adding GeoShape support and a spark.kryo.registrator=com.thinkaurelius.titan.hadoop.serialize.TitanKryoRegistrator class to get those objects to work with SparkGraphComputer:

>> ... consider spark.serializer=IoRegistryAwareKryoSerializer... I can't tell what's going on with the compile errors and would like to see the source you used before commenting on those.

I posted a repo with the code changes I tried out on CompactBufferSerializer to get it to implement your SerializerShim interface to my github, and what I thought an IoRegistry that would be picked up by IoRegistryAwareKryoSerializer should look like - the first commit message has the compiler error in it, and from there on I just detail the steps I went through to try getting the groovy-eclipse-compiler plugin to build the project. Somewhere in my attempt to upgrade groovy-eclipse-compiler to groovy version 2.4.6 (the latest in their dev branch is 2.4.3) things didn't work out, and I think if I just had the understanding of kryo to copy Marko's groovy code over to Java properly then the whole thing would be solved:

>>  I think this means that TinkerPop is passing around CompactBuffer[] in the course of running jobs (not sure where/why), whereas Spark does not necessarily need CompactBuffer[] serialization for its own internals.

In fact from the Spark issues I looked at, it looks like many people have troubles with Spark not registering all the internal classes it requires for job execution under certain conditions and have to do it manually through either spark.kryo.classesToRegister=... or in application code if its a private class or for some other reason "classesToRegister" fails to pick it up. Hence I tried out the SparkInternalsIoRegistry idea, but without being able to get the groovy serializer to compile or port it to Java because I don't really know what's going on in Marko's serializer it's a non-starter.

Thanks,
Dylan
Reply all
Reply to author
Forward
0 new messages