Re: Custom serializer problem

359 views
Skip to first unread message

Shimi Kiviti

unread,
Dec 13, 2012, 3:42:34 PM12/13/12
to storm...@googlegroups.com
It uses the default Serializer which is Java Serialization and I assume these classes are not Serializable.

I am not sure what is wrong with the configuration.
You can try registering these classes with an IKryoDecorator. Put the registration code in the decorate function and from your topology config call registerDecorator()
It works for me.

Shimi


On Thursday, December 13, 2012 4:56:32 AM UTC+2, Ben wrote:
Hi,
I am trying to get the Scala's Java conversion classes serialized within Storm/Kryo. 

Outside of Storm, I had no problem serializing instances of those classes after registering the appropriate Kryo serializers. 
However, I get NotSerializableExceptions when a Storm Bolt emits SeqWrapper instances. (I use StormSubmitter.submitTopology(...) to submit a topology with a custom config and Kryo serializer registrations.)

The supervisor shows the correct configuration in the logs:

 worker - Worker has topology config { ...
 "topology.kryo.register" {"scala.collection.JavaConversions$SeqWrapper" "backtype.storm.serialization.types.ArrayListSerializer", "
scala.collection.JavaConversions$SetWrapper" "backtype.storm.serialization.types.HashSetSerializer"} }

But this exception is thrown later:
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: scala.collection.JavaConversions$SetWrapper
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: scala.collection.JavaConversions$SetWrapper
at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4128$fn__4132.invoke(worker.clj:99)
at backtype.storm.util$fast_list_map.invoke(util.clj:771)
at backtype.storm.daemon.worker$mk_transfer_fn$fn__4128.invoke(worker.clj:99)
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3905.invoke(executor.clj:208)
at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
... 6 more

Does anyone have an idea what might be wrong?

Thanks,
Ben
Reply all
Reply to author
Forward
0 new messages