Thanks for the pointers! I was already doing the cast and the class is available on both nodes. I was able to finally figure out the issue, so I'm including a troubleshooting write-up in case someone else runs into a similar problem.
Your comment about Java serialization made me think to disable Kryo serialization. When I used Java serialization it worked, so I then wrote a test for the serializer that uses Kryo and it fails even within the same JVM. This meant it was probably an issue with configuring Kryo.
Unfortunately, when I copy the setup steps they use in the test, I get this when trying to deserialize:
Caused by: java.lang.NoClassDefFoundError: com/esotericsoftware/reflectasm/ConstructorAccess
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1271)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1127)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1136)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:559)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:535)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709)
at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:73)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.kryptnostic.conductor.rpc.LambdaStreamSerializer.read(LambdaStreamSerializer.java:49)
at com.kryptnostic.conductor.rpc.LambdaStreamSerializer.read(LambdaStreamSerializer.java:1)
at com.kryptnostic.rhizome.hazelcast.serializers.BaseSerializerTest.testSerializeDeserialize(BaseSerializerTest.java:51)
at com.kryptnostic.conductor.rpc.LambdaSerializersTest.testSerializeDeserialize(LambdaSerializersTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
... 15 more
Caused by: java.lang.ClassNotFoundException: com.esotericsoftware.reflectasm.ConstructorAccess
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 36 more
This looks like a case of missing dependencies and after another class not found error, I was able to identify the transitive dependencies that weren't getting pulled in:
compile 'com.esotericsoftware:minlog:1.3.0'
compile 'com.esotericsoftware:reflectasm:1.11.3'
So in order to use Kryo and get the benefit of more efficient serialization of closure variables when submitting lambdas do the above and you will be good. If you know the classes ahead of time, it's often worth registering them with the Kryo serializer for even more efficient serialization.
The final class looks like this:
public class LambdaStreamSerializer implements SelfRegisteringStreamSerializer<Runnable> {
private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {
@Override
protected Kryo initialValue() {
Kryo kryo = new Kryo();
// Stuff from
// https://github.com/EsotericSoftware/kryo/blob/master/test/com/esotericsoftware/kryo/serializers/Java8ClosureSerializerTest.java
kryo.setInstantiatorStrategy( new Kryo.DefaultInstantiatorStrategy( new StdInstantiatorStrategy() ) );
kryo.register( Object[].class );
kryo.register( java.lang.Class.class );
// Shared Lambdas
kryo.register( Lambdas.class );
kryo.register( SerializedLambda.class );
// always needed for closure serialization, also if registrationRequired=false
kryo.register( ClosureSerializer.Closure.class, new ClosureSerializer() );
kryo.register( Runnable.class, new ClosureSerializer() );
return kryo;
}
};
public LambdaStreamSerializer() {}
@Override
public void write( ObjectDataOutput out, Runnable object ) throws IOException {
Output output = new Output( (OutputStream) out );
kryoThreadLocal.get().writeClassAndObject( output, object );
output.flush();
}
@Override
public Runnable read( ObjectDataInput in ) throws IOException {
Input input = new Input( (InputStream) in );
return (Runnable) kryoThreadLocal.get().readClassAndObject( input );
}
@Override
public int getTypeId() {
return HazelcastSerializerTypeIds.RUNNABLE.ordinal();
}
@Override
public void destroy() {
}
@Override
public Class<Runnable> getClazz() {
return Runnable.class;
}
}
At some point, we'll add in options for compression and unsafe.