Hi, I met java.io.NotSerializableException when i want to emit a nested object in storm 0.8.0.
the exception trace belows:
1855 [Thread-9] INFO backtype.storm.daemon.executor - Opened spout word:(7)
1856 [Thread-28] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
Serialization trace:
aArray (com.mytaobao.algo.ExclamationTopology$B)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:82)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:636)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
Serialization trace:
aArray (com.mytaobao.algo.ExclamationTopology$B)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4120$fn__4124.invoke(worker.clj:99)
at backtype.storm.util$fast_list_map.invoke(util.clj:770)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4120.invoke(worker.clj:99)
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3898.invoke(executor.clj:205)
at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
... 6 more
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:535)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:480)
... 19 more
Caused by: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1355)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1163)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:343)
at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
... 21 more
===============================================================================
and my test code belows:
public class ExclamationTopology {
public static class A {
public String a;
public int b;
}
public static class B {
public A[] aArray;
public String c;
}
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
// _collector.emit(tuple, new Values(ad));
// _collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class MySpout extends BaseRichSpout {
boolean _isDistributed;
SpoutOutputCollector _collector;
public MySpout() {
this(true);
}
public MySpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
com.esotericsoftware.minlog.Log.set(com.esotericsoftware.minlog.Log.LEVEL_TRACE);
}
public void close() {
}
public void nextTuple() {
B bi = new B();
A[] at = new A[1];
at[0] = new A();
bi.aArray = at;
_collector.emit(new Values(bi));
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new MySpout(), 3);
builder.setBolt("exclaim1", new ExclamationBolt(), 2)
.shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 1)
.shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(false);
conf.putAll(Utils.findAndReadConfigFile("test.yaml"));
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], conf, builder.createTopology());
}
}
i touch test.yaml in ~/.storm/. it contains:
topology.kryo.register:
- com.mytaobao.algo.ExclamationTopology$A
- com.mytaobao.algo.ExclamationTopology$B
i test the code in local cluster and distributed cluster, both cause java.io.NotSerializableException. i read the source code of storm and kryo,
and read kryo log:
TRACE: [kryo] Register class name: com.mytaobao.algo.ExclamationTopology$A[] (backtype.storm.serialization.SerializableSerializer);
i found that, to A[], kryo will call classResolver.registerImplicit(), which will register backtype.storm.serialization.SerializableSerializer,
since KryoSerializableDefault overrided getDefaultSerializer(), return backtype.storm.serialization.SerializableSerializer.
At last i add