I am planning to put Gobblin in production as a way to provide some more durable access to kafka topics. As part of that, I have been trying to see how gobblin acts when it fails.
My only change from a simple setup was creating a custom WriterPartitioner to make sure I can retain the original kafka partition number. I do this since, by default, I was getting one file per partition, but there was no way to know which partition the data had come from, and I really want to be able to know if I gobblin is dropping data. I can post it if someone is interested.
I have discovered some pretty strange behavior when I reuse the same base path across runs though. Imagine I am writing to /day/hour/topic/partition
The first time I run something in a given hour, the code works fine, and writes to s3 properly: /day/hour/topic/partition/part-something is created
The second time I do this on the same hour, it doesn't override the file, or add another: It writes to /day/hour/topic/partition/partition/part-something! So it saves the data, but to a place I would have not expected.
The third time, it fails with a stack trace like this:
16/05/16 17:16:29 ERROR runtime.AbstractJobLauncher: Failed to commit dataset state for dataset of job <jobname>
java.io.IOException: java.io.IOException: Target /day/hour/topic/partition/partition is a directory
at gobblin.util.ParallelRunner.close(ParallelRunner.java:291)
at com.google.common.io.Closer.close(Closer.java:214)
at gobblin.publisher.BaseDataPublisher.close(BaseDataPublisher.java:122)
at com.google.common.io.Closer.close(Closer.java:214)
at gobblin.runtime.JobContext.commit(JobContext.java:390)
at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:274)
at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:60)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.stripe.kafkatools.gobblin.AirflowMain$.main(AirflowMain.scala:51)
at com.stripe.kafkatools.gobblin.AirflowMain.main(AirflowMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.io.IOException: Target /day/hour/topic/partition/partition is a directory
at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:503)
at org.apache.hadoop.fs.FileUtil.checkDest(FileUtil.java:505)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:351)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
at gobblin.util.HadoopUtils.movePath(HadoopUtils.java:155)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:265)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:258)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
And not only does it fail, but the kafka consumer has move forward, so the offsets that the failed run was ingesting will be lost, unless I do manual recovery.
I can get similar errors of data ingested and thrown away by just entering a completely invalid s3 output file
So I wonder:
Is any of this expected behavior? I was very surprised by the behavior of the second run: Overwriting the fille, adding another file to the same directory or outright failing were things I would have expected it to do.
When there is an error in the middle of the commit sequence, is there any way I can recover/retry the work straight from hdfs, or is it all deleted?
Would it make sense to have a setting that makes Gobblin reset the consumer's offset if there is a failure on commit? We have workflows where dropping data into the floor just isn't OK.
Is there any gude on Gobblin's behavior on errors at different stages? I am going to end up having to write a runbook for when things inevitably go wrong in production, so any operational knowledge out there would be helpful