Hadoop18TapUtil.commitTask race condition w/ speculative execution

120 views
Skip to first unread message

Ben Podgursky

unread,
Jul 6, 2016, 11:25:10 PM7/6/16
to cascadi...@googlegroups.com
Hi,

We’re running into a transient bug which we believe is caused by a combination of speculative execution, one of our custom Taps, and the behavior of Hadoop18TapUtil.

Our custom tap writes (simplified) two output files per task during execution:

part-XXXXX.partfile
index_files/part-XXXXX.index

During task execution, these of course both end up in _temporary until the Tap is closing at the end of task execution.  The .index file contains record locations within the .partfile, so it’s critical that the .index file was written by the same task as the partfile (two .partfiles even with the same data won’t be byte-identical because of in-memory set ordering, etc). 

The output file committing done by Hadoop18TapUtil.moveTaskOutputs currently erases output files if they already exist: 

>         if( !fs.delete( finalOutputPath, true ) )

As far as I can tell, there is no protection done to ensure that only one task is calling Hadoop18TapUtil.commitTask simultaneously.  Since we are running our jobs using YARN speculative execution, we believe the following is happening, based on the task logs we have found:

1) attempt0 enters commitTask
2) attempt0 commits part-XXXXX.partfile to the output directory
3) attempt1 enters commitTask
4) attempt1 deletes part-XXXXX.partfile from the output directory, and commits a new version of part-XXXXX.partfile
5) attempt0 commits index_files/part-XXXXX.index
6) attempt0 tells YARN it has succeeded
7) attempt1 is killed

This leaves part-XXXXX.bucketfile and part-XXXXX.index inconsistent, leading to data corruption issues.

I believe we could prevent this scenario with a change like this: https://github.com/bpodgursky/cascading/commit/0bffbfa89096298797ca842874896252a1b057f3.  Given this change, task1 would fail before without task0’s output data (assuming this property was enabled), and the job would succeed.

- does this scenario sound plausible?
- does this fix look appropriate?
- if so, would it be possible to backport this change to the 2.5 branch?  (I know it’s ancient…) 


Thanks,

Ben

Chris K Wensel

unread,
Jul 7, 2016, 12:24:57 PM7/7/16
to cascadi...@googlegroups.com
Happy to take a look at a test and patch against 3.2 wip.

the 2.5 tree is over 2 years old, your best bet it to fork and run an internal build.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/CAC21K-7fyXhSxjaV2jF9RxCQV3-A4FyVh07cTh6dSH6t0ev-sA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Michael Lowman

unread,
Oct 31, 2016, 12:32:33 PM10/31/16
to cascading-user
I have also encountered this race condition. Unfortunately, I don't think the suggested patch is sufficient to cover the problematic case; a counterexample would be a stage producing multiple output files which may not be interleaved.
I believe the only proper solution here is for the commit of the output to occur on the application master; commits on the map and reduce nodes can't guarantee run-only-once semantics in all cases.

Are there any thoughts on this?

I'm also a little uncertain how to write a successful deterministic test of this behavior. The timing window for the race condition is very tight unless the HDFS hangs the operation temporarily. We've written a job that uses wallclock time to to reproduce the bug, and it's able to do so only once in every fifty or so executions.
I'd welcome suggestions on testing, though.

-Michael
Reply all
Reply to author
Forward
0 new messages