Job Fails when writing to the next hourly window bucket

57 views
Skip to first unread message

laksh...@tokbox.com

unread,
Mar 24, 2016, 11:15:43 PM3/24/16
to gobblin-users
Hello Folks,

I am using gobblin to write from Kafka--->S3 (Map Reduce Mode). I am writing json data.

I have implemented a Partitioner by subclassing the TimeBasedWriterPartitioner as specified in this following post.

And my getRecordTimeStamp() just gets the current system timestamp.
@Override

 
public class LogJsonWriterPartitioner  extends TimeBasedWriterPartitioner<byte[]>

 
public long getRecordTimestamp(byte[] record) {

 
// TODO Auto-generated method stub

 
return System.currentTimeMillis();

 
}


When i try to write to s3, everything works fine.


eg: s3a://gobblintestbucket.com/landingzone/productA/2016/03/25/02/part.task_GobblinKafkaQuickStart_1458873940824_0_0.json



Now when I try to run the next hour, the data should flow in to the following directory. s3a://gobblintestbucket.com/landingzone/productA/2016/03/25/03.  ie the next hour bucket window.


But the job fails.


2016-03-24 20:01:47 PDT INFO  [main] org.apache.hadoop.fs.s3a.S3AFileSystem  684 - Getting path status for /gobblinlandingzone/productA (gobblinlandingzone/productA)



2016-03-24 20:01:48 PDT INFO  [main] gobblin.publisher.BaseDataPublisher  310 - Moving hdfs://10.0.1.227:8020/gobblin/task-output/productA/2016 to /gobblinlandingzone/productA/2016

2016-03-24 20:01:48 PDT INFO  [ParallelRunner] org.apache.hadoop.fs.s3a.S3AFileSystem  684 - Getting path status for /gobblinlandingzone/productA/2016 (gobblinlandingzone/productA/2016)

2016-03-24 20:01:48 PDT WARN  [main] gobblin.util.ParallelRunner  348 - Task failed: Move hdfs://10.0.1.227:8020/gobblin/task-output/productA/2016 to /gobblinlandingzone/productA/2016

java
.lang.IllegalArgumentException: Cannot copy from hdfs://10.0.1.227:8020/gobblin/task-output/productA/2016 to /gobblinlandingzone/productA/2016 because dst exists

 at com
.google.common.base.Preconditions.checkArgument(Preconditions.java:93)

 at gobblin
.util.HadoopUtils.copyPath(HadoopUtils.java:272)

 at gobblin
.util.HadoopUtils.movePath(HadoopUtils.java:215)

 at gobblin
.util.ParallelRunner$6.call(ParallelRunner.java:312)

 at gobblin
.util.ParallelRunner$6.call(ParallelRunner.java:305)

 at java
.util.concurrent.FutureTask.run(FutureTask.java:262)

 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 

 at java
.lang.Thread.run(Thread.java:745)


I think it throws up once it checks the directory exists at the year level(2016) itself. Its not going all the way to the hourly directory level to check for the folder exists. May be a bug in the way the gobblin looks for the directory?


Any thoughts?

Thanks,


Lakshmanan



laksh...@tokbox.com

unread,
Mar 25, 2016, 12:03:38 AM3/25/16
to gobblin-users
This fails even when I am using gobblin to write from Kafka ---->HDFS(Map Reduce Mode).

Here is my configuration
writer.partitioner.class=gobblin.writer.partitioner.LogJsonWriterPartitioner

writer
.partition.granularity=hour

writer
.partition.pattern=YYYY/MM/dd/HH

writer
.partition.timezone=UTC

Sahil Takiar

unread,
Mar 25, 2016, 1:46:25 AM3/25/16
to Lakshmanan Muthuraman, gobblin-users
Be sure to set data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/4d764fe4-0cd2-499e-a3b4-3a977346cc63%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages