Still trying to debug this. I confirmed that it seems to work fine if output "null" for all the emitted tuple values. So it appears to have something to do with trying to pass-through an existing tuple value, perhaps?
Just now, I got the following exception instead, changing no other code. Symptom of the same issue? Something related to too much parallelism?
I'm running at 128 executors at the moment. Any known issues having too high an executors/workers ratio? I'm currently testing at 3 machines, 60 workers, 168 executors.
java.lang.RuntimeException: java.lang.RuntimeException: java.io.OptionalDataException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.RuntimeException: java.io.OptionalDataException
at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:41)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:718)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:111)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:615)
at backtype.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:20)
at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:36)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3967.invoke(executor.clj:311)
at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
... 6 more
Caused by: java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1366)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at java.util.HashMap.readObject(HashMap.java:1046)
at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:988)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1865)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:39)
... 15 more
-Will