storm-0.8.0 Serialization failed for nested object

284 views
Skip to first unread message

陈竞

unread,
Aug 30, 2012, 5:47:53 AM8/30/12
to storm...@googlegroups.com
Hi, I met java.io.NotSerializableException when i want to emit a nested object in storm 0.8.0. 

the exception trace belows:

1855 [Thread-9] INFO  backtype.storm.daemon.executor  - Opened spout word:(7)
1856 [Thread-28] ERROR backtype.storm.daemon.executor  -
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
Serialization trace:
aArray (com.mytaobao.algo.ExclamationTopology$B)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:82)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
    at backtype.storm.disruptor$consume_loop_STAR_$fn__1596.invoke(disruptor.clj:67)
    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:636)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
Serialization trace:
aArray (com.mytaobao.algo.ExclamationTopology$B)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:495)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
    at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
    at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4120$fn__4124.invoke(worker.clj:99)
    at backtype.storm.util$fast_list_map.invoke(util.clj:770)
    at backtype.storm.daemon.worker$mk_transfer_fn$fn__4120.invoke(worker.clj:99)
    at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3898.invoke(executor.clj:205)
    at backtype.storm.disruptor$clojure_handler$reify__1584.onEvent(disruptor.clj:43)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:79)
    ... 6 more
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
    at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:535)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:480)
    ... 19 more
Caused by: java.io.NotSerializableException: com.mytaobao.algo.ExclamationTopology$A
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1355)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1163)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:343)
    at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
    ... 21 more

===============================================================================
and my test code  belows:

public class ExclamationTopology {
    public static class A {
        public String a;
        public int b;
    }
    public static class B {
        public A[] aArray;
        public String c;
    }
    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
        //    _collector.emit(tuple, new Values(ad));
        //    _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }

    public static class MySpout extends BaseRichSpout {

        boolean _isDistributed;
        SpoutOutputCollector _collector;

        public MySpout() {
            this(true);
        }

        public MySpout(boolean isDistributed) {
            _isDistributed = isDistributed;
        }

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            com.esotericsoftware.minlog.Log.set(com.esotericsoftware.minlog.Log.LEVEL_TRACE);
        }

        public void close() {

        }
      public void nextTuple() {
            B bi = new B();
            A[] at = new A[1];
            at[0] = new A();
            bi.aArray = at;
            _collector.emit(new Values(bi));
        }

        public void ack(Object msgId) {

        }

        public void fail(Object msgId) {

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

       
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new MySpout(), 3);
        builder.setBolt("exclaim1", new ExclamationBolt(), 2)
                .shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 1)
                .shuffleGrouping("exclaim1");

        Config conf = new Config();
        conf.setDebug(false);
        conf.putAll(Utils.findAndReadConfigFile("test.yaml"));
        conf.setNumWorkers(2);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(args[0], conf, builder.createTopology());
    }
}

i touch test.yaml in ~/.storm/.  it contains:
topology.kryo.register:
    - com.mytaobao.algo.ExclamationTopology$A
    - com.mytaobao.algo.ExclamationTopology$B

i test the code in local cluster and distributed cluster, both cause java.io.NotSerializableException. i read the source code of storm and kryo,
and read kryo log:
TRACE: [kryo] Register class name: com.mytaobao.algo.ExclamationTopology$A[] (backtype.storm.serialization.SerializableSerializer);

i found that, to A[],  kryo will call classResolver.registerImplicit(), which will register backtype.storm.serialization.SerializableSerializer, 

 since KryoSerializableDefault overrided getDefaultSerializer(), return backtype.storm.serialization.SerializableSerializer.

At last  i add 
      








--
陈竞,中科院计算技术研究所,高性能计算机中心
Jing Chen HPCC.ICT.AC China

陈竞

unread,
Aug 30, 2012, 5:53:29 AM8/30/12
to storm...@googlegroups.com
continue my email~~

At last  i add 
 - "[Lcom.mytaobao.algo.ExclamationTopology$A;"
in test.yaml, and the code works well.  

So,  is it a bug? or in 0.8.0 we had to serialize the class on my own?


2012/8/30 陈竞 <cj.m...@gmail.com>
Reply all
Reply to author
Forward
0 new messages