I am running a Gobblin job periodically for every 2 minutes (job.schedule=0 0/2 * * * ?).
I also want to have all output files that this job collected in a S3 directory, with all files from jobs in the same hour in the same directory. (e.g., output from jobs run between 5pm ~ 6pm today, in the directory 2016-03-11-17 in S3), and I set the following options.
writer.partition.granularity=hour
writer.partition.pattern=YYYY-MM-dd-HH
However, it gives the following exception.
java.lang.IllegalArgumentException: Cannot copy from file:/home/ubuntu/gobblin/working_directory/task-output/query.topic/2016-03-11-17 to s3a://relcy-analytics/kafka/query.topic/2016-03-11-17 because dst exists
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.HadoopUtils.copyPath(HadoopUtils.java:209)
at gobblin.util.HadoopUtils.movePath(HadoopUtils.java:182)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:293)
at gobblin.util.ParallelRunner$6.call(ParallelRunner.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I saw gobblin.util.HadoopUtils.copyPath definition, and it looks like it is trying to overwrite the destination directory every time the job runs. What I want is just copying files to add files to the directory.
Am I doing something wrong?
Also, in the comment on copyPath function, it says 'TODO this method does not handle cleaning up any local files leftover by writing to S3.' Does that mean the local files will accumulate so I need a script to periodically clean up these files?