Kryo ConcurrentModificationException 0.8.1

1,325 views
Skip to first unread message

Will Koffel

unread,
Oct 9, 2012, 8:13:17 PM10/9/12
to storm...@googlegroups.com
It appears that when I raise the parallelism of a particular component beyond about 32 executors, Storm is throwing the following exception.  It appears to be fully in the storm internals.  I'm not spawning any threads on my own, so I don't think this is a non-thread-safe issue.

java.lang.RuntimeException: java.util.ConcurrentModificationException
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
	at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
	at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
	at clojure.lang.AFn.run(AFn.java:24)
	at java.lang.Thread.run(Thread.java:679)
Caused by: java.util.ConcurrentModificationException
	at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:390)
	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:409)
	at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:408)
	at java.util.HashMap.writeObject(HashMap.java:1016)
	at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:616)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:959)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
	at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)
	at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)
	at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__4128$fn__4132.invoke(worker.clj:99)
	at backtype.storm.util$fast_list_map.invoke(util.clj:771)
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__4128.invoke(worker.clj:99)
	at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3905.invoke(executor.clj:208)
	at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)

Will Koffel

unread,
Oct 9, 2012, 8:22:14 PM10/9/12
to storm...@googlegroups.com
Note: this exception is thrown in the component that's just above the highly-parallel one in my tree.  It spreads out in a shuffle from 1 component to a bunch.  When "a bunch" = 32 it seems okay.  When "a bunch" = 64, I seem to consistently get this exception.

-Will
Message has been deleted
Message has been deleted
Message has been deleted

JWellington

unread,
Oct 10, 2012, 11:34:03 AM10/10/12
to storm...@googlegroups.com
This is error is being caused because a Map is being written to and read from at the same time.

I.E. 

for(Map.Entry entry : map)
{
   if(condition)
       map.remove(entry.getKey());
}
The map is being read from and modified simultaneously. This is illegal. Could be a bug.

On Tuesday, October 9, 2012 8:13:17 PM UTC-4, Will Koffel wrote:

Nathan Marz

unread,
Oct 10, 2012, 11:35:08 AM10/10/12
to storm...@googlegroups.com
Hi Will,

Please provide a reproducible test case for this. I've never seen anything like this, and we run plenty of components with parallelism >> 32. Also, what version of storm are you running?

-Nathan
--
Twitter: @nathanmarz
http://nathanmarz.com

Will Koffel

unread,
Oct 10, 2012, 1:19:49 PM10/10/12
to storm...@googlegroups.com
Still trying to debug this.  I confirmed that it seems to work fine if output "null" for all the emitted tuple values.  So it appears to have something to do with trying to pass-through an existing tuple value, perhaps?

Just now, I got the following exception instead, changing no other code.  Symptom of the same issue?  Something related to too much parallelism?

I'm running at 128 executors at the moment.  Any known issues having too high an executors/workers ratio?  I'm currently testing at 3 machines, 60 workers, 168 executors.

java.lang.RuntimeException: java.lang.RuntimeException: java.io.OptionalDataException
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
	at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
	at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
	at clojure.lang.AFn.run(AFn.java:24)
	at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.RuntimeException: java.io.OptionalDataException
	at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:41)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:718)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:111)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:615)
	at backtype.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:20)
	at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:36)
	at backtype.storm.daemon.executor$mk_task_receiver$fn__3967.invoke(executor.clj:311)
	at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
	... 6 more
Caused by: java.io.OptionalDataException
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1366)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
	at java.util.HashMap.readObject(HashMap.java:1046)
	at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:616)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:988)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1865)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
	at backtype.storm.serialization.SerializableSerializer.read(SerializableSerializer.java:39)
	... 15 more
-Will

Will Koffel

unread,
Oct 10, 2012, 1:23:05 PM10/10/12
to storm...@googlegroups.com
Running latest 0.8.1, zeromq 2.1.7.  I'll keep at it.  There's another exception I just posted to this thread.  Does that one ring any bells, or am I still in uncharted waters?

I suspect it's something I can workaround in my code, just need to narrow it down.  If I get a reproducible case that doesn't involve my code, I'll absolutely send it along.

-Will

Nathan Marz

unread,
Oct 10, 2012, 1:25:43 PM10/10/12
to storm...@googlegroups.com
I've never seen any of these, so I suspect it's something wrong with your code. Hence why I need to see a reproducible test case.

Nathan Marz

unread,
Oct 16, 2012, 12:37:02 PM10/16/12
to storm...@googlegroups.com
Thanks, you're right. I updated the Troubleshooting page: https://github.com/nathanmarz/storm/wiki/Troubleshooting

On Tue, Oct 16, 2012 at 8:47 AM, Christian Nardi <christi...@gmail.com> wrote:
It has happened to me. My problem was that I was emiting a collection that I kept in my bolt memory. So while I was trying to modify that collection in my Bolt, Storm was trying to serialize it. That's why the ConcurrentModificationException in a serialization code. Hope it helps

Will Koffel

unread,
Oct 16, 2012, 1:00:42 PM10/16/12
to storm...@googlegroups.com
Yup, confirmed on my end it was a similar issue.  I had a HashMap I was using to keep a trace of actions my various bolts were taking.  With too much parallelism, a race condition could cause the Map to be updated by a bolt while storm was serializing the tuple containing it for emission.

Makes sense, but a tricky "gotcha".  Thanks for updating the docs, Nathan!

-Will
Reply all
Reply to author
Forward
0 new messages