Distributed Lambdas

261 views
Skip to first unread message

mat...@kryptnostic.com

unread,
Aug 18, 2016, 1:33:37 AM8/18/16
to Hazelcast
This blogpost by Fuad Malikov demonstrates an approach for serializing lambdas and executing them on a different node: http://blog.hazelcast.com/i-can-make-lambda-expressions-distributed/

I was able to successfully reproduce this within a single JVM, but when I added another node to the cluster I get deserialization failures on the executor node.

com.esotericsoftware.kryo.KryoException: Unable to find class: com.kryptnostic.conductor.rpc.LambdaFactory$$Lambda$32/1984317333

I'm using Kryo 4 and Hazelcast 3.7 to test this out. 

The lambda I'm submitting is: () -> System.out.println("echo!")

-mtr 

Noctarius

unread,
Aug 18, 2016, 2:16:39 AM8/18/16
to Christoph Engelbert - Hazelcast
Hey,

Is the class available on the other nodes classpath? Apart from that there’s an easier way since Lambdas are inherently serializable by spec now. You just need to forcibly cast it to Serializable:

ExecutorService es = hazelcastInstance.getExecutorService(…);
es.submit((Runnable & Serializable) () -> System.out.println(“echo!”));

Anyhow the class with the lambda has still to be available on the other node to execute it.

Chris


--
You received this message because you are subscribed to the Google Groups "Hazelcast" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hazelcast+...@googlegroups.com.
To post to this group, send email to haze...@googlegroups.com.
Visit this group at https://groups.google.com/group/hazelcast.
To view this discussion on the web visit https://groups.google.com/d/msgid/hazelcast/7e2e9bfe-b6a8-4bb0-a7d7-9f40ad1b11ed%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

mat...@kryptnostic.com

unread,
Aug 18, 2016, 1:25:39 PM8/18/16
to Hazelcast
Thanks for the pointers! I was already doing the cast and the class is available on both nodes. I was able to finally figure out the issue, so I'm including a troubleshooting write-up in case someone else runs into a similar problem. 

Your comment about Java serialization made me think to disable Kryo serialization. When I used Java serialization it worked, so I then wrote a test for the serializer that uses Kryo and it fails even within the same JVM. This meant it was probably an issue with configuring Kryo. 

I was able to find this test class in Kryo that is used to their lambda and closure serialization: https://github.com/EsotericSoftware/kryo/blob/master/test/com/esotericsoftware/kryo/serializers/Java8ClosureSerializerTest.java

Unfortunately, when I copy the setup steps they use in the test, I get this when trying to deserialize:

Caused by: java.lang.NoClassDefFoundError: com/esotericsoftware/reflectasm/ConstructorAccess
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1271)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1127)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1136)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:559)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:535)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:73)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.kryptnostic.conductor.rpc.LambdaStreamSerializer.read(LambdaStreamSerializer.java:49)
at com.kryptnostic.conductor.rpc.LambdaStreamSerializer.read(LambdaStreamSerializer.java:1)
at com.kryptnostic.rhizome.hazelcast.serializers.BaseSerializerTest.testSerializeDeserialize(BaseSerializerTest.java:51)
at com.kryptnostic.conductor.rpc.LambdaSerializersTest.testSerializeDeserialize(LambdaSerializersTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
... 15 more
Caused by: java.lang.ClassNotFoundException: com.esotericsoftware.reflectasm.ConstructorAccess
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 36 more

This looks like a case of missing dependencies and after another class not found error, I was able to identify the transitive dependencies that weren't getting pulled in:
compile 'com.esotericsoftware:minlog:1.3.0'
compile 'com.esotericsoftware:reflectasm:1.11.3'

So in order to use Kryo and get the benefit of more efficient serialization of closure variables when submitting lambdas do the above and you will be good. If you know the classes ahead of time, it's often worth registering them with the Kryo serializer for even more efficient serialization.

The final class looks like this:

public class LambdaStreamSerializer implements SelfRegisteringStreamSerializer<Runnable> {

    private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {


        @Override

        protected Kryo initialValue() {

            Kryo kryo = new Kryo();

            // Stuff from

            // https://github.com/EsotericSoftware/kryo/blob/master/test/com/esotericsoftware/kryo/serializers/Java8ClosureSerializerTest.java

            kryo.setInstantiatorStrategy( new Kryo.DefaultInstantiatorStrategy( new StdInstantiatorStrategy() ) );

            kryo.register( Object[].class );

            kryo.register( java.lang.Class.class );


            // Shared Lambdas

            kryo.register( Lambdas.class );

            kryo.register( SerializedLambda.class );


            // always needed for closure serialization, also if registrationRequired=false

            kryo.register( ClosureSerializer.Closure.class, new ClosureSerializer() );


            kryo.register( Runnable.class, new ClosureSerializer() );


            return kryo;

        }

    };


    public LambdaStreamSerializer() {}


    @Override

    public void write( ObjectDataOutput out, Runnable object ) throws IOException {

        Output output = new Output( (OutputStream) out );

        kryoThreadLocal.get().writeClassAndObject( output, object );

        output.flush();

    }


    @Override

    public Runnable read( ObjectDataInput in ) throws IOException {

        Input input = new Input( (InputStream) in );

        return (Runnable) kryoThreadLocal.get().readClassAndObject( input );

    }


    @Override

    public int getTypeId() {

        return HazelcastSerializerTypeIds.RUNNABLE.ordinal();

    }


    @Override

    public void destroy() {


    }


    @Override

    public Class<Runnable> getClazz() {

        return Runnable.class;

    }

}


At some point, we'll add in options for compression and unsafe.

Noctarius

unread,
Aug 18, 2016, 1:40:23 PM8/18/16
to haze...@googlegroups.com
Cool, thanks for sharing Matthew!

mat...@kryptnostic.com

unread,
Aug 18, 2016, 1:53:57 PM8/18/16
to Hazelcast
Bah, one more-- this didn't show up until I built with jar dependencies: 
compile 'org.objenesis:objenesis:2.4'

mat...@kryptnostic.com

unread,
Aug 21, 2016, 7:21:38 AM8/21/16
to Hazelcast, mat...@kryptnostic.com
So that others don't have to struggle with mysterious lambda serialization errors. We ran into a bug with Eclipse Neon where lambda serialization fails, when it shouldn't be. 


-mtr
Reply all
Reply to author
Forward
0 new messages