Hi,
We’re running into a transient bug which we believe is caused by a combination of speculative execution, one of our custom Taps, and the behavior of Hadoop18TapUtil.
Our custom tap writes (simplified) two output files per task during execution:
part-XXXXX.partfile
index_files/part-XXXXX.index
During task execution, these of course both end up in _temporary until the Tap is closing at the end of task execution. The .index file contains record locations within the .partfile, so it’s critical that the .index file was written by the same task as the partfile (two .partfiles even with the same data won’t be byte-identical because of in-memory set ordering, etc).
The output file committing done by Hadoop18TapUtil.moveTaskOutputs currently erases output files if they already exist:
> if( !fs.delete( finalOutputPath, true ) )
As far as I can tell, there is no protection done to ensure that only one task is calling Hadoop18TapUtil.commitTask simultaneously. Since we are running our jobs using YARN speculative execution, we believe the following is happening, based on the task logs we have found:
1) attempt0 enters commitTask
2) attempt0 commits part-XXXXX.partfile to the output directory
3) attempt1 enters commitTask
4) attempt1 deletes part-XXXXX.partfile from the output directory, and commits a new version of part-XXXXX.partfile
5) attempt0 commits index_files/part-XXXXX.index
6) attempt0 tells YARN it has succeeded
7) attempt1 is killed
This leaves part-XXXXX.bucketfile and part-XXXXX.index inconsistent, leading to data corruption issues.
- does this scenario sound plausible?
- does this fix look appropriate?
- if so, would it be possible to backport this change to the 2.5 branch? (I know it’s ancient…)
Thanks,
Ben