I had been debugging this problem the last two evenings and finished analyzing some debug info while I was waiting on some other stuff on my laptop. I figured out what the problem is and it's a fairly significant bug in Storm Trident state management. It appears to a bug that mostly only affects Linux. That fact along with some other evidence tells me it's related to the timing of the operating system process pre-empting (which will certainly cause this sort of runtime variation in behavior). Also, you don't have to send any messages to the topology, just letting it sit for a minute or so and storm will put itself into a state that it can't recover from unless you clear the Kafka and zookeeper state.
Here's what happens, or at least as best as I can explain it from debugging the code. When you load a topology, empty transactional tuple batches start firing immediately one after another. I see around probably 5 per second on my Linux box. Each batch is first given a transaction id by the MasterBatchCoordinator, which is the the previous issued tx id + 1. The RotatingTransactionalState object is then checked to see if state exists for that id. If it does, it's state data is returned and added as index 1 of the output tuple (first value is the tx id). If it doesn't exist and is higher than the previous ids (which it always should be since the tx ids are incremented by 1), it calls the init state method on the batch coordinator implementation associated with the spout. The returned value (which is simply the number of partitions) is then stored in as the value of the state and stored at index 1 in the output tuple. The emitBatch method is then called and passed the value of index 1 of the tuple (the number of partitions from the state) which it iterates through and continues on. This seems to be the way everything is intended to work in a perfect situation.
After a short period of this, the MastBatchCoordinator will mysteriously return a tx id that seems to usually be around 12 higher than the previous one issued. That in itself isn't a big deal, except that the tx id following that oddball one will resume the original sequence, so the ids would look something like this for example:
5, 6, 7, 19, 8, 9 ...
Stuff hits the fan when the tx ids start following the original sequence (the 8 in the above example). When storm processes tx id 8 from the example, it's now lower than the previous one (the oddball 19 in the above example). Storm again sees that the tx id isn't present in the RotatingTransactionalState and it also notices that this tx id isn't the latest value in it's state. In that case, it assigns the state for that tx id to null. That null is then placed as index 1 of the outgoing tuple. Then just like the above, the value of index 1 of the tuple is passed to emitBatch. When emitBatch tries to iterate the partitions, a NullPointerException is thrown in the for loop because obviously a null is not a number (the emitBatch takes a Long as the number of partitions which means it will accept null, but it doesn't check if the numberOfPartitions is null).
Once that out of sequence tx id is issued, all following tx ids will be associated with null. The setup of state and the retrieval of the state for tx ids are done in 2 different workers. Once that first null is stored in the state, the worker that is in charge of emitting outstanding tx ids, will crash immediately with a NPE every time the supervisor restarts it when it tries to iterate from 0 to null in PartitionedTridentSpoutExecutor.
I can see a couple ways to fix it, but I'm not sure what the intended behavior is when a tx id is assigned that is lower than one of the ones already in the state and whether or not any of those fixes will cause any bad side effects.
--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
--