partitioner in mapreduce

115 views
Skip to first unread message

aug li

unread,
Feb 25, 2016, 5:48:58 AM2/25/16
to gobblin-users
in kafka,data format is json,i want to load data to hdfs each hour in each file in mapreduce mode.i have some problem .please help me.
i also not know if the extract.limit work in mapreduce mode.
the conf is :

job.name=KafkaToHdfs
job.group=Group
job.description=Move Data Kafka to HDFS
job.lock.enabled=false

mr.job.max.mappers=1

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
kafka.brokers=**.**.**.**:****
extract.namespace=gobblin.extract.kafka
topic.whitelist=****
bootstrap.with.offset=earliest

writer.destination.type=HDFS
writer.output.format=Avro
writer.fs.uri=hdfs://******:****
writer.staging.dir=/data/staging/
writer.output.dir=/data/output/
writer.builder.class=gobblin.writer.PartitionAwareDataWriterBuilder
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
writer.file.path.type=tablename
writer.partition.level=hourly
#writer.partition.granularity=hour
writer.partition.pattern=YYYY/MM/dd/HH

data.publisher.final.dir=/data/job-output
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
#data.pulisher.type=gobblin.pulisher.BaseDataPublisher
data.publisher.replace.final.dir=false

state.store.fs.uri=hdfs://****:****
state.store.dir=/data/state-store

metrics.reporting.file.enabled=true
metrics.log.dir=/data/metrics
metrics.reporting.file.suffix=txt

#extract.limit.enabled=true
#extract.limit.type=time
#extract.limit.time.limit=1
#extract.limit.time.limit.timeunit=minutes

WARN [DFSClient] DFSInputStream has been closed already
WARN [GobblinMetrics] Metric reporting has already started
WARN [DFSClient] DFSInputStream has been closed already
WARN [MetricContext] MetricContext with specified name already exists, appending UUID to the given name: ea15167a-2f17-479a-94c2-74d07e46d61b
ERROR [Task] Task task_KafkaToHdfs_1456396692463_0 failed
java.lang.IllegalStateException: Fork 0 of task task_KafkaToHdfs_1456396692463_0 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:207)
at gobblin.runtime.Task.processRecord(Task.java:466)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
ERROR [Fork-0] Fork 0 of task task_KafkaToHdfs_1456396692463_0 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:301)
at gobblin.runtime.Fork.buildWriter(Fork.java:357)
at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
at gobblin.runtime.Fork.processRecords(Fork.java:391)
at gobblin.runtime.Fork.run(Fork.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:297)
... 11 more
ERROR [MRJobLauncher] Task task_KafkaToHdfs_1456396692463_0 failed due to exception: java.lang.IllegalStateException: Fork 0 of task task_KafkaToHdfs_1456396692463_0 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:207)
at gobblin.runtime.Task.processRecord(Task.java:466)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Exception in thread "Thread-45" java.lang.IllegalArgumentException: Missing required property writer.staging.dir
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:64)
at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:169)
at gobblin.runtime.mapreduce.GobblinOutputCommitter.abortJob(GobblinOutputCommitter.java:98)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:549)
WARN [TaskStateCollectorService] Output task state path /data/gobblin/gobblin-work/working/KafkaToHdfs/output/job_KafkaToHdfs_1456396692463 does not exist

Issac Buenrostro

unread,
Feb 25, 2016, 11:56:27 AM2/25/16
to aug li, gobblin-users
Hello,
The following line in the configuration is wrong:
writer.builder.class=gobblin.writer.PartitionAwareDataWriterBuilder

PartitionAwareDataWriterBuilder is an abstract class and cannot be used as a builder.class. Please look at the documentation here https://github.com/linkedin/gobblin/wiki/Partitioned%20Writers . Currently, the only implemented partition aware data writer is gobblin.writer.AvroDataWriterBuilder, to write json you will have to write your own partition aware writer. You can use AvroDrataWriterBuilder as reference, and look at the documentation mentioned above. 

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/a05e9c58-5048-403f-84f2-577cd0397318%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
Message has been deleted
0 new messages