NPE in DisruptorQueue w/ Trident

1,430 views
Skip to first unread message

Brian O'Neill

unread,
Jan 22, 2013, 1:07:28 PM1/22/13
to storm...@googlegroups.com
We recently converted all of our transactional topologies to Trident.

Since migrating them, intermittently we're seeing a similar issue to:
https://github.com/nathanmarz/storm/issues/325

The stack is as follows:
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
at backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:69)
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:52)
at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:64)
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
at backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
at backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
... 6 more

Note that it is coming out of the TridentSpout. Do we have something
misconfigured in our topology?

Any intent to tag storm-kafka so we know which version we should use w/ 0.8.2?
(We may be running an old version of storm-kafka, but I don't know
what commit/version to upgrade to.)

-brian

--
Brian ONeill
Lead Architect, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://brianoneill.blogspot.com/
twitter: @boneill42

Brian O'Neill

unread,
Jan 22, 2013, 4:52:25 PM1/22/13
to storm...@googlegroups.com
Update, we went back to 0.8.1 and this issue disappeared.

Maybe there is something with the threading in 0.8.2?

-brian

Enno Shioji

unread,
Jan 23, 2013, 10:54:58 AM1/23/13
to storm...@googlegroups.com, bo...@alumni.brown.edu

Ryan Moquin

unread,
Jan 25, 2013, 9:14:10 AM1/25/13
to storm...@googlegroups.com, bo...@alumni.brown.edu

I had been debugging this problem the last two evenings and finished analyzing some debug info while I was waiting on some other stuff on my laptop.  I figured out what the problem is and it's a fairly significant bug in Storm Trident state management.  It appears to a bug that mostly only affects Linux. That fact along with some other evidence tells me it's related to the timing of the operating system process pre-empting (which will certainly cause this sort of runtime variation in behavior).  Also, you don't have to send any messages to the topology, just letting it sit for a minute or so and storm will put itself into a state that it can't recover from unless you clear the Kafka and zookeeper state.

Here's what happens, or at least as best as I can explain it from debugging the code.  When you load a topology, empty transactional tuple batches start firing immediately one after another.  I see around probably 5 per second on my Linux box.  Each batch is first given a transaction id by the MasterBatchCoordinator, which is the the previous issued tx id + 1.  The RotatingTransactionalState object is then checked to see if state exists for that id.  If it does, it's state data is returned and added as index 1 of the output tuple (first value is the tx id).  If it doesn't exist and is higher than the previous ids (which it always should be since the tx ids are incremented by 1), it calls the init state method on the batch coordinator implementation associated with the spout.  The returned value (which is simply the number of partitions) is then stored in as the value of the state and stored at index 1 in the output tuple.  The emitBatch method is then called and passed the value of index 1 of the tuple (the number of partitions from the state) which it iterates through and continues on.  This seems to be the way everything is intended to work in a perfect situation.  

After a short period of this, the MastBatchCoordinator will mysteriously return a tx id that seems to usually be around 12 higher than the previous one issued.  That in itself isn't a big deal, except that the tx id following that oddball one will resume the original sequence, so the ids would look something like this for example:

5, 6, 7, 19, 8, 9 ...

Stuff hits the fan when the tx ids start following the original sequence (the 8 in the above example).  When storm processes tx id 8 from the example, it's now lower than the previous one (the oddball 19 in the above example).  Storm again sees that the tx id isn't present in the RotatingTransactionalState and it also notices that this tx id isn't the latest value in it's state.  In that case, it assigns the state for that tx id to null.  That null is then placed as index 1 of the outgoing tuple.  Then just like the above, the value of index 1 of  the tuple is passed to emitBatch.  When emitBatch tries to iterate the partitions, a NullPointerException is thrown in the for loop because obviously a null is not a number (the emitBatch takes a Long as the number of partitions which means it will accept null, but it doesn't check if the numberOfPartitions is null).

Once that out of sequence tx id is issued, all following tx ids will be associated with null.  The setup of state and the retrieval of the state for tx ids are done in 2 different workers.  Once that first null is stored in the state, the worker that is in charge of emitting outstanding tx ids, will crash immediately with a NPE every time the supervisor restarts it when it tries to iterate from 0 to null in PartitionedTridentSpoutExecutor.

I can see a couple ways to fix it, but I'm not sure what the intended behavior is when a tx id is assigned that is lower than one of the ones already in the state and whether or not any of those fixes will cause any bad side effects.

Ryan

Ryan Ebanks

unread,
Feb 4, 2013, 10:54:50 PM2/4/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I am seeing the same error using storm 0.8.1

ava.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.NullPointerException
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:69)
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:52)
at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:64)
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:325)
at backtype.storm.daemon.executor$fn__4036$tuple_action_fn__4037.invoke(executor.clj:504)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3967.invoke(executor.clj:314)
at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
... 6 more
2013-02-05 03:31:04 executor [ERROR] 
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:55)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:56)
at backtype.storm.disruptor$consume_loop_STAR_$fn__1597.invoke(disruptor.clj:67)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.NullPointerException
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:69)
at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:52)
at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:64)
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:325)
at backtype.storm.daemon.executor$fn__4036$tuple_action_fn__4037.invoke(executor.clj:504)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3967.invoke(executor.clj:314)
at backtype.storm.disruptor$clojure_handler$reify__1585.onEvent(disruptor.clj:43)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:81)
... 6 more

Are there any know workarounds for this error.  We do have storm-kafka spouts that are function correctly on our cluster, but this PartionedTridentSpout always fails on the cluster.  The spout does function correctly when running on a local cluster.  Any possible help would be greatly appreciated.

Thanks.

David Ross

unread,
Mar 18, 2013, 11:11:43 PM3/18/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
Has this issue been resolved? We started seeing it as well, using storm 0.8.2.

Viral Bajaria

unread,
Mar 19, 2013, 12:45:42 AM3/19/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I thought the resolution here was that you need to throttle your max spout pending since you are emitting more lines than your pipeline is able to handle.

--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

David Ross

unread,
Mar 19, 2013, 1:12:44 AM3/19/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I changed maxSpoutPending from 10 to 1 and get the same exception.

David Ross

unread,
Mar 19, 2013, 6:40:17 PM3/19/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
As an update, I have removed this setting altogether and wiped out the zk state. It now works ok. Will report back if another issue occurs.

Jim Baugh

unread,
Mar 26, 2013, 9:17:49 AM3/26/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
We removed the maxSpoutPending setting as well to fix this issue.

Rizwan Sharif

unread,
Apr 9, 2013, 5:58:03 PM4/9/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
Removing maxSpoutPending also fixed it for us but performance is considerable dropped.

Viral Bajaria

unread,
Apr 9, 2013, 6:02:47 PM4/9/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I am guessing you are using Trident. Have you tried reducing batch emit interval ? If not, try reducing that.


--

Fei Gao

unread,
Aug 27, 2013, 2:16:18 AM8/27/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
recently met the same issue, NullPointerException in DisruptorQueue when using Trident,.
I am using storm 0.8.2, the NPE happens when I have set topology.max.spout.pending, workaround is to remove topology.max.spout.pending from config, but it will impact the performance.
Does anyone hasany updated for this issue, any solution  for it?

Thank You
Gao, Fei

On Tuesday, January 22, 2013 10:07:28 AM UTC-8, Brian O'Neill wrote:

churly lin

unread,
Dec 12, 2013, 4:36:11 AM12/12/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I met the same issue now, NPE when using the TransactionalTridentKafkaSpout.
Does anyone has solved this issue?

churly lin

unread,
Dec 13, 2013, 3:10:07 AM12/13/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
Hi Viral Bajaria:
I reduce the batch emit interval to 100, but it doesn't work.
Reply all
Reply to author
Forward
0 new messages