in kafka,data format is json,i want to load data to hdfs each hour in each file in mapreduce mode.i have some problem .please help me.
i also not know if the extract.limit work in mapreduce mode.
the conf is :
job.group=Group
job.description=Move Data Kafka to HDFS
job.lock.enabled=false
mr.job.max.mappers=1
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
kafka.brokers=**.**.**.**:****
extract.namespace=gobblin.extract.kafka
topic.whitelist=****
bootstrap.with.offset=earliest
writer.destination.type=HDFS
writer.output.format=Avro
writer.fs.uri=hdfs://******:****
writer.staging.dir=/data/staging/
writer.output.dir=/data/output/
writer.builder.class=gobblin.writer.PartitionAwareDataWriterBuilder
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
writer.file.path.type=tablename
writer.partition.level=hourly
#writer.partition.granularity=hour
writer.partition.pattern=YYYY/MM/dd/HH
data.publisher.final.dir=/data/job-output
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
#data.pulisher.type=gobblin.pulisher.BaseDataPublisher
data.publisher.replace.final.dir=false
state.store.fs.uri=hdfs://****:****
state.store.dir=/data/state-store
metrics.reporting.file.enabled=true
metrics.log.dir=/data/metrics
metrics.reporting.file.suffix=txt
#extract.limit.enabled=true
#extract.limit.type=time
#extract.limit.time.limit=1
#extract.limit.time.limit.timeunit=minutes
WARN [DFSClient] DFSInputStream has been closed already
WARN [GobblinMetrics] Metric reporting has already started
WARN [DFSClient] DFSInputStream has been closed already
WARN [MetricContext] MetricContext with specified name already exists, appending UUID to the given name: ea15167a-2f17-479a-94c2-74d07e46d61b
ERROR [Task] Task task_KafkaToHdfs_1456396692463_0 failed
java.lang.IllegalStateException: Fork 0 of task task_KafkaToHdfs_1456396692463_0 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:207)
at gobblin.runtime.Task.processRecord(Task.java:466)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ERROR [Fork-0] Fork 0 of task task_KafkaToHdfs_1456396692463_0 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:301)
at gobblin.runtime.Fork.buildWriter(Fork.java:357)
at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
at gobblin.runtime.Fork.processRecords(Fork.java:391)
at gobblin.runtime.Fork.run(Fork.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:297)
... 11 more
ERROR [MRJobLauncher] Task task_KafkaToHdfs_1456396692463_0 failed due to exception: java.lang.IllegalStateException: Fork 0 of task task_KafkaToHdfs_1456396692463_0 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:207)
at gobblin.runtime.Task.processRecord(Task.java:466)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-45" java.lang.IllegalArgumentException: Missing required property writer.staging.dir
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:64)
at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:169)
at gobblin.runtime.mapreduce.GobblinOutputCommitter.abortJob(GobblinOutputCommitter.java:98)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:549)
WARN [TaskStateCollectorService] Output task state path /data/gobblin/gobblin-work/working/KafkaToHdfs/output/job_KafkaToHdfs_1456396692463 does not exist