TimeBasedWriterPartitioner

855 views
Skip to first unread message

kalyan.p...@gmail.com

unread,
Nov 30, 2015, 1:09:34 AM11/30/15
to gobblin-users
Hi,
i trying to write, time based buckets in hdfs. but It is failing job, after added TimeBasedWriterPartitioner properties. please check what is wrong in properties file. but it is working SimpleDataWriterBuilder.
>>>>>>>>>>>>>>>>>>>
job.name=SHP_KafkaToHdfs
job.group=ShipmentGroup
job.description=Move Shipment Data Kafka to HDFS 
job.lock.enabled=false

job.schedule=0 0/2 * * * ?

jobconf.dir=/Users/Kalyan/gobblin-dist/job_config/kafka_hdfs/shipment/
jobconf.extensions=shipment1.pull
mr.job.max.mappers=8

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
kafka.brokers=localhost:9091,localhost:9092,localhost:9093  
extract.namespace=gobblin.extract.kafka
topic.whitelist=shp.*
bootstrap.with.offset=earliest

writer.destination.type=HDFS
writer.output.format=txt
writer.fs.uri=hdfs://localhost:9000 
writer.staging.dir=/data/shp/staging/
writer.output.dir=/data/shp/output/
writer.builder.class=gobblin.writer.PartitionAwareDataWriterBuilder
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
writer.file.path.type=tablename
writer.partition.level=hourly
writer.partition.pattern=YYYY/MM/dd/HH


data.publisher.final.dir=/data/shp/job-output
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
data.publisher.replace.final.dir=false

state.store.fs.uri=hdfs://localhost:9000
state.store.dir=/data/shp/state-store

metrics.reporting.file.enabled=true
metrics.log.dir=/data/shp/metrics
metrics.reporting.file.suffix=txt
metrics.reporting.file.enabled=true
metrics.log.dir=/data/shp/metrics
metrics.reporting.file.suffix=txt

extract.limit.enabled=true
extract.limit.type=time
extract.limit.time.limit=15
extract.limit.time.limit.timeunit=minutes
<<<<<<<<<<<<<<
Exception

2015-11-29 21:57:21 PST INFO  [main] org.quartz.impl.StdSchedulerFactory  1184 - Using default implementation for ThreadExecutor
2015-11-29 21:57:21 PST INFO  [main] org.quartz.core.SchedulerSignalerImpl  61 - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
2015-11-29 21:57:21 PST INFO  [main] org.quartz.core.QuartzScheduler  240 - Quartz Scheduler v.2.2.1 created.
2015-11-29 21:57:21 PST INFO  [main] org.quartz.simpl.RAMJobStore  155 - RAMJobStore initialized.
2015-11-29 21:57:21 PST INFO  [main] org.quartz.core.QuartzScheduler  305 - Scheduler meta-data: Quartz Scheduler (v2.2.1) 'LocalJobScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 3 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

2015-11-29 21:57:21 PST INFO  [main] org.quartz.impl.StdSchedulerFactory  1339 - Quartz scheduler 'LocalJobScheduler' initialized from specified file: '/Users/Kalyan/gobblin-dist/conf/quartz.properties'
2015-11-29 21:57:21 PST INFO  [main] org.quartz.impl.StdSchedulerFactory  1343 - Quartz scheduler version: 2.2.1
2015-11-29 21:57:21 PST INFO  [main] gobblin.scheduler.SchedulerDaemon  107 - Starting the scheduler daemon
2015-11-29 21:57:21 PST INFO  [JobScheduler STARTING] gobblin.scheduler.JobScheduler  137 - Starting the job scheduler
2015-11-29 21:57:21 PST INFO  [JobScheduler STARTING] org.quartz.core.QuartzScheduler  575 - Scheduler LocalJobScheduler_$_NON_CLUSTERED started.
2015-11-29 21:57:21 PST INFO  [JobScheduler STARTING] gobblin.scheduler.JobScheduler  356 - Scheduling locally configured jobs
2015-11-29 21:57:21 PST INFO  [JobScheduler STARTING] gobblin.scheduler.JobScheduler  369 - Loaded 2 job configurations
2015-11-29 21:57:21 PST WARN  [JobScheduler STARTING] gobblin.scheduler.JobScheduler  203 - Job SHP_KafkaToHdfs has already been scheduled
2015-11-29 21:58:00 PST WARN  [LocalJobScheduler_Worker-1] org.apache.hadoop.util.NativeCodeLoader  62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-11-29 21:58:00 PST WARN  [LocalJobScheduler_Worker-1] gobblin.runtime.JobContext  197 - Property task.data.root.dir is missing.
2015-11-29 21:58:00 PST INFO  [TaskExecutor STARTING] gobblin.runtime.TaskExecutor  119 - Starting the task executor
2015-11-29 21:58:00 PST INFO  [LocalTaskStateTracker STARTING] gobblin.runtime.AbstractTaskStateTracker  64 - Starting the task state tracker
2015-11-29 21:58:00 PST INFO  [LocalJobScheduler_Worker-1] gobblin.metrics.GobblinMetrics  429 - Not reporting metrics to JMX
2015-11-29 21:58:00 PST INFO  [LocalJobScheduler_Worker-1] gobblin.metrics.GobblinMetrics  379 - Not reporting metrics to log files
2015-11-29 21:58:00 PST INFO  [LocalJobScheduler_Worker-1] gobblin.metrics.GobblinMetrics  440 - Not reporting metrics to Kafka
2015-11-29 21:58:00 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaWrapper$KafkaOldAPI  233 - Fetching topic metadata from broker localhost:9091
2015-11-29 21:58:01 PST WARN  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  288 - Previous offset for partition shp.shipment:0 does not exist. This partition will start from the earliest offset: 0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  407 - Created workunit for partition shp.shipment:0: lowWatermark=0, highWatermark=16, range=16
2015-11-29 21:58:01 PST WARN  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  288 - Previous offset for partition shp.shipment:1 does not exist. This partition will start from the earliest offset: 0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  407 - Created workunit for partition shp.shipment:1: lowWatermark=0, highWatermark=58, range=58
2015-11-29 21:58:01 PST WARN  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  288 - Previous offset for partition shp.cgomvmtspec:0 does not exist. This partition will start from the earliest offset: 0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  407 - Created workunit for partition shp.cgomvmtspec:0: lowWatermark=0, highWatermark=29, range=29
2015-11-29 21:58:01 PST WARN  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  288 - Previous offset for partition shp.cgomvmtspec:1 does not exist. This partition will start from the earliest offset: 0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.KafkaSource  407 - Created workunit for partition shp.cgomvmtspec:1: lowWatermark=0, highWatermark=0, range=0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator  145 - For all topics not pulled in the previous run, estimated avg time to pull a record is 1.0 milliseconds
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  219 - Created MultiWorkUnit for partitions [shp.cgomvmtspec:1]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 0: estimated load=0.003010, partitions=[]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 1: estimated load=17.459740, partitions=[[shp.shipment:1]]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 2: estimated load=0.003010, partitions=[]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 3: estimated load=4.816480, partitions=[[shp.shipment:0]]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 4: estimated load=0.003010, partitions=[[shp.cgomvmtspec:1]]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 5: estimated load=0.003010, partitions=[]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 6: estimated load=0.003010, partitions=[]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 7: estimated load=8.729870, partitions=[[shp.cgomvmtspec:0]]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  279 - Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 17.459740; Diff = 99.982759%
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.util.ExecutorsUtils  108 - Attempting to shutdown ExecutorSerivce: java.util.concurrent.ThreadPoolExecutor@74b83dac[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.util.ExecutorsUtils  127 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@74b83dac[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.AbstractJobLauncher  234 - Starting job job_SHP_KafkaToHdfs_1448863080018
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.TaskExecutor  152 - Executing task task_SHP_KafkaToHdfs_1448863080018_0
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.TaskExecutor  152 - Executing task task_SHP_KafkaToHdfs_1448863080018_1
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.TaskExecutor  152 - Executing task task_SHP_KafkaToHdfs_1448863080018_2
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.TaskExecutor  152 - Executing task task_SHP_KafkaToHdfs_1448863080018_3
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.local.LocalJobLauncher  119 - Waiting for submitted tasks of job job_SHP_KafkaToHdfs_1448863080018 to complete...
2015-11-29 21:58:01 PST INFO  [LocalJobScheduler_Worker-1] gobblin.runtime.local.LocalJobLauncher  121 - 4 out of 4 tasks of job job_SHP_KafkaToHdfs_1448863080018 are running
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  190 - Pulling topic shp.shipment
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  190 - Pulling topic shp.shipment
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.shipment:0 from offset 0 to 16, range=16
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.shipment:1 from offset 0 to 58, range=58
2015-11-29 21:58:01 PST ERROR [TaskExecutor-0] gobblin.runtime.Task  254 - Task task_SHP_KafkaToHdfs_1448863080018_0 failed
java.lang.IllegalStateException: Fork 0 of task task_SHP_KafkaToHdfs_1448863080018_0 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:218)
at gobblin.runtime.Task.processRecord(Task.java:464)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-11-29 21:58:01 PST ERROR [ForkExecutor-0] gobblin.runtime.Fork  180 - Fork 0 of task task_SHP_KafkaToHdfs_1448863080018_1 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:302)
at gobblin.runtime.Fork.buildWriter(Fork.java:357)
at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
at gobblin.runtime.Fork.processRecords(Fork.java:391)
at gobblin.runtime.Fork.run(Fork.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:298)
... 11 more
2015-11-29 21:58:01 PST ERROR [TaskExecutor-1] gobblin.runtime.Task  254 - Task task_SHP_KafkaToHdfs_1448863080018_1 failed
java.lang.IllegalStateException: Fork 0 of task task_SHP_KafkaToHdfs_1448863080018_1 has failed and is no longer running
at gobblin.runtime.Fork.putRecord(Fork.java:218)
at gobblin.runtime.Task.processRecord(Task.java:464)
at gobblin.runtime.Task.run(Task.java:173)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-11-29 21:58:01 PST ERROR [ForkExecutor-1] gobblin.runtime.Fork  180 - Fork 0 of task task_SHP_KafkaToHdfs_1448863080018_0 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:302)
at gobblin.runtime.Fork.buildWriter(Fork.java:357)
at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
at gobblin.runtime.Fork.processRecords(Fork.java:391)
at gobblin.runtime.Fork.run(Fork.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:298)
... 11 more
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  266 - Actual high watermark for partition shp.shipment:0=7, expected=16
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  266 - Actual high watermark for partition shp.shipment:1=5, expected=58
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  278 - Avg time to pull a record for partition shp.shipment:0 not recorded
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  278 - Avg time to pull a record for partition shp.shipment:1 not recorded
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.runtime.Task  277 - publish.data.at.job.level is true. Will publish data at the job level.
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.runtime.Task  277 - publish.data.at.job.level is true. Will publish data at the job level.
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.runtime.TaskExecutor  205 - Scheduled retry of failed task task_SHP_KafkaToHdfs_1448863080018_1 to run in 0 seconds
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.runtime.TaskExecutor  205 - Scheduled retry of failed task task_SHP_KafkaToHdfs_1448863080018_0 to run in 0 seconds
2015-11-29 21:58:01 PST INFO  [TaskRetryExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  190 - Pulling topic shp.shipment
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  190 - Pulling topic shp.cgomvmtspec
2015-11-29 21:58:01 PST INFO  [TaskRetryExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.shipment:1 from offset 0 to 58, range=58
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.cgomvmtspec:1 from offset 0 to 0, range=0
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  177 - Finished pulling partition shp.cgomvmtspec:1
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.source.extractor.extract.kafka.KafkaExtractor  165 - Finished pulling topic shp.cgomvmtspec
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  190 - Pulling topic shp.cgomvmtspec
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.runtime.Task  177 - Extracted 0 data records
2015-11-29 21:58:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.cgomvmtspec:0 from offset 0 to 29, range=29
2015-11-29 21:58:01 PST INFO  [TaskExecutor-1] gobblin.runtime.Task  178 - Row quality checker finished with results: 
2015-11-29 21:58:01 PST ERROR [ForkExecutor-1] gobblin.runtime.Fork  180 - Fork 0 of task task_SHP_KafkaToHdfs_1448863080018_0 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:302)


Thanks 
Kalyan




Ziyang Liu

unread,
Nov 30, 2015, 12:53:01 PM11/30/15
to gobblin-users
Hi Kaylan, TimeBasedWriterPartitioner is an abstract class. Try TimeBasedAvroWriterPartitioner. Or implement your own getRecordTimestamp() if that is not what you need.

-Ziyang

kalyan.p...@gmail.com

unread,
Nov 30, 2015, 4:52:13 PM11/30/15
to gobblin-users
Hi Ziyang,
i am looking this format (writer.partition.pattern=YYYY/MM/dd/HH). Is it support TimeBasedAvroWriterPartitioner or i need write separate class?. 

Thanks
Kalyan

Ziyang Liu

unread,
Nov 30, 2015, 5:00:56 PM11/30/15
to gobblin-users
Yes, setting writer.partition.pattern=YYYY/MM/dd/HH should work. Alternatively you can do writer.partition.granularity=hour

-Ziyang

kalyan.p...@gmail.com

unread,
Nov 30, 2015, 5:24:52 PM11/30/15
to gobblin-users
Hi Ziyang,
It is throwing exception, any thing wrong in my configurations. Please check.

>>>>>>
writer.destination.type=HDFS
writer.output.format=txt
writer.fs.uri=hdfs://localhost:9000 
writer.staging.dir=/data/shp/staging/
writer.output.dir=/data/shp/output/
#writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.builder.class=gobblin.writer.PartitionAwareDataWriterBuilder
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
writer.file.path.type=tablename
#writer.partition.level=hourly
writer.partition.granularity=hour
writer.partition.pattern=YYYY/MM/dd/HH


data.publisher.final.dir=/data/shp/job-output
data.publisher.type=gobblin.publisher.BaseDataPublisher
#data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
data.publisher.replace.final.dir=false

>>>>>>>>>>>>>>>>>>
Here Exception.
<<<<<<

2015-11-30 14:16:01 PST INFO  [TaskExecutor-0] gobblin.source.extractor.extract.kafka.KafkaExtractor  210 - Pulling partition shp.shipment:1 from offset 0 to 73, range=73
2015-11-30 14:16:01 PST ERROR [ForkExecutor-0] gobblin.runtime.Fork  180 - Fork 0 of task task_SHP_KafkaToHdfs_1448921760016_0 failed to process data records
java.lang.RuntimeException: java.lang.InstantiationException
at gobblin.runtime.TaskContext.getDataWriterBuilder(TaskContext.java:302)
at gobblin.runtime.Fork.buildWriter(Fork.java:357)
at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
at gobblin.runtime.Fork.processRecords(Fork.java:391)
at gobblin.runtime.Fork.run(Fork.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at 


Thanks 
Kalyan

Ziyang Liu

unread,
Nov 30, 2015, 5:29:43 PM11/30/15
to gobblin-users
writer.builder.class should be gobblin.writer.AvroDataWriterBuilder if you want to use AvroHdfsDataWriter, or gobblin.writer.SimpleDataWriterBuilder if you want to use SimpleDataWriter. The one you specified is abstract.

jenny

unread,
Dec 17, 2015, 3:28:52 AM12/17/15
to gobblin-users
Is it working? Can you share your config? I have same exception, like below.



2015-12-17 17:16:03 KST INFO  [TaskRetryExecutor-0] gobblin.runtime.Task  178 - Row quality checker finished with results:
2015-12-17 17:16:03 KST ERROR [ForkExecutor-0] gobblin.runtime.Fork  170 - Fork 0 of task task_Job_1450339140087_0 failed to process data records
java.io.IOException: java.lang.InstantiationException
        at gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:90)
        at gobblin.runtime.Fork.buildWriter(Fork.java:365)
        at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:371)
        at gobblin.runtime.Fork.processRecords(Fork.java:391)
        at gobblin.runtime.Fork.run(Fork.java:166)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
        at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at org.apache.commons.lang3.reflect.ConstructorUtils.invokeConstructor(ConstructorUtils.java:118)
        at org.apache.commons.lang3.reflect.ConstructorUtils.invokeConstructor(ConstructorUtils.java:85)
        at gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:83)
        ... 11 more


My job config is below. My test github commit is 34bb1c96944fa8df1a06c785b375acf33ae814d4

job.group=Group
job.description=Gobblin time partition job for Kafka
job.lock.enabled=false

kafka.brokers=${env:GOBBLIN_KAFKA_BROKERS}

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka

topic.whitelist=${env:GOBBLIN_TOPIC_WHITELIST}

writer.destination.type=HDFS
writer.output.format=txt


writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging
writer.output.dir=${env:GOBBLIN_WORK_DIR}/task-output

data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${env:GOBBLIN_PUBLISHER_DIR}
bootstrap.with.offset=${env:KAFKA_OFFSET}

#job.schedule=* 0/5 * * * ?
job.schedule=${env:GOBBLIN_JOB_SCHEDULE}

mr.job.max.mappers=1

#### I don't want to job retry
workunit.retry.enabled=false




writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
writer.file.path.type=TABLENAME              # I changed it because of enum type
#writer.partition.level=hourly
writer.partition.pattern=YYYY/MM/dd/HH


 


2015년 12월 1일 화요일 오전 7시 29분 43초 UTC+9, Ziyang Liu 님의 말:

Issac Buenrostro

unread,
Dec 17, 2015, 11:54:53 AM12/17/15
to jenny, gobblin-users
Hi Jenny,
The class gobblin.writer.partitioner.TimeBasedWriterPartitioner is abstract, and cannot be instantiated. There is an implementation of this class gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner that extracts the timestamp from a configurable field in Avro records, however, it doesn't look like you are trying to write Avro records. You will probably have to extend TimeBasedWriterPartitioner to a concrete class that can handle your record type, i.e. you have to tell the partitioner how to extract the timestamp from your records.
Best,
Issac

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/f8398fec-29c9-4a37-a380-209e51ba0f15%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Zongjun Qi

unread,
Mar 10, 2016, 6:40:39 PM3/10/16
to gobblin-users
Hi Ziyang,

Can you have a customized

writer.partitioner.class

using writer.builder.class=gobblin.writer.SimpleDataWriterBuilder 

to write to hdfs?

When I don't have writer.partitioner.class, I get kafka data written to hdfs as expected. But when I have a writer.partitioner.class, all jobs failed and nothing gets written to hdfs. 
Thanks,
Zongjun

Ziyang Liu

unread,
Mar 10, 2016, 7:30:35 PM3/10/16
to gobblin-users
Hi Zongjun, you don't have to use writer.partitioner.class. If you don't specify that, it will simply not do the partition and write all records into the same folder.
You only need that if you want to write different records into different folders, for example based on timestamp.

Zongjun Qi

unread,
Mar 11, 2016, 12:24:19 AM3/11/16
to gobblin-users
Thanks Ziyang. My kafka payload is JSON format, and I do need to write them to HDFS folders day by day.

I have solved this. To help folks struggling with similar problem, here is my gobblin job file:

job.name=kafka2hdfs

job.group=GobblinKafka

job.description=Convert kafka topic to hdfs

job.lock.enabled=false


kafka.brokers=272.23.31.614:9092


topic.whitelist=mytopic

topic.blacklist=

mr.job.max.mappers=2

topics.move.to.latest.offset=false

bootstrap.with.offset=earliest

reset.on.offset.out.of.range=earliest


source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource

extract.namespace=gobblin.extract.kafka


writer.partitioner.class=com.pm.data.logging.gobblin.LogJsonWriterPartitioner

writer.partition.granularity=day

writer.partition.pattern=YYYY-MM-dd

writer.partition.timezone=UTC


writer.builder.class=gobblin.writer.SimpleDataWriterBuilder

writer.file.path.type=tablename

writer.destination.type=HDFS

writer.output.format=txt

simple.writer.delimiter=\n

simple.writer.prepend.size=false


data.publisher.type=gobblin.publisher.BaseDataPublisher


metrics.reporting.file.enabled=true

metrics.log.dir=/user/ubuntu/gobblin/metrics

metrics.reporting.file.suffix=txt


mr.job.root.dir=/tmp/gobblin/gobblin-kafka/working

state.store.dir=/user/ubuntu/gobblin/gobblin-kafka/state-store

task.data.root.dir=/user/ubuntu/gobblin/gobblin-kafka/task-data

data.publisher.final.dir=/user/ubuntu/gobblin/gobblin-kafka/job-output



Cheers!
Zongjun

Zongjun Qi

unread,
Mar 11, 2016, 12:26:37 AM3/11/16
to gobblin-users
Also, here is the source code for the customized WriterPartitioner:
public class LogJsonWriterPartitioner extends TimeBasedWriterPartitioner<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(LogJsonWriterPartitioner.class);
    private Gson gson = new Gson();
    private String TIMESTAMP_FIELD_ID = "trigger_time";
    public LogJsonWriterPartitioner(gobblin.configuration.State state, int numBranches, int branchId) {
        super(state, numBranches, branchId);
    }

    @Override
    public long getRecordTimestamp(byte[] payload) {
        JsonObject jsonObject;
        String payloadString;
        try {
            payloadString = new String(payload, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            log.error("Unable to load UTF-8 encoding, falling back to system default", e);
            payloadString = new String(payload);
        }

        try {
            jsonObject = gson.fromJson(payloadString, JsonObject.class);
        } catch (RuntimeException e) {
            log.error("Caught exception while parsing JSON string '" + payloadString + "'.");
            throw new RuntimeException(e);
        }

        long ret = System.currentTimeMillis();
        if (jsonObject != null && jsonObject.has(TIMESTAMP_FIELD_ID)) {
            ret = jsonObject.get(TIMESTAMP_FIELD_ID).getAsLong() * 1000L;
        }

        return ret;
    }
}

Zongjun

Ziyang Liu

unread,
Mar 11, 2016, 1:26:05 PM3/11/16
to gobblin-users
Hi Zongjun,

Glad it works. Thanks for posting.

er...@relcy.com

unread,
Mar 11, 2016, 6:34:29 PM3/11/16
to gobblin-users
Hi,

How did you make your custom class run?
I wrote my own version of LogJsonWriterPartitioner (even simpler one) in gobblin/gobblin-core/src/main/java/gobblin/writer/partitioner directory, then ran ./gradlew clean build, and ran Gobblin, but it throws

java.io.IOException: java.lang.ClassNotFoundException: gobblin.writer.partitioner.LogJsonWriterPartitioner
        at gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:90)
        at gobblin.runtime.Fork.buildWriter(Fork.java:379)
        at gobblin.runtime.Fork.buildWriterIfNotPresent(Fork.java:385)
        at gobblin.runtime.Fork.processRecords(Fork.java:405)
        at gobblin.runtime.Fork.run(Fork.java:169)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: gobblin.writer.partitioner.LogJsonWriterPartitioner
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at gobblin.writer.PartitionedDataWriter.<init>(PartitionedDataWriter.java:83)
        ... 9 more
2016-03-11 23:32:00 UTC ERROR [TaskRetryExecutor-0] gobblin.runtime.Task  265 - Task task_GobblinKafkaS3_1457739000435_0 failed
java.lang.IllegalStateException: Fork 0 of task task_GobblinKafkaS3_1457739000435_0 has failed and is no longer running
        at gobblin.runtime.Fork.putRecord(Fork.java:216)
        at gobblin.runtime.Task.processRecord(Task.java:488)
        at gobblin.runtime.Task.run(Task.java:171)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

er...@relcy.com

unread,
Mar 11, 2016, 6:40:25 PM3/11/16
to gobblin-users
My class definition is

package gobblin.writer.partitioner;

public class LogJsonWriterPartitioner extends TimeBasedWriterPartitioner<byte[]> {
  public LogJsonWriterPartitioner(gobblin.configuration.State state, int numBranches, int branchId) {
    super(state, numBranches, branchId);
  }

  @Override
  public long getRecordTimestamp(byte[] payload) {
    return System.currentTimeMillis();
  }
}

on gobblin/gobblin-core/src/main/java/gobblin/writer/partitioner dir

Zongjun Qi

unread,
Mar 11, 2016, 6:48:52 PM3/11/16
to gobblin-users
Hi Eric,

I did not add my class within the Gobblin source code. I started a new project with dependencies on Gobblin packages.

You just need to depends on gobblin-api and gobblin-core jars. Here is my pom.xml.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.pm.data.gobblin.kafka</groupId>
<artifactId>LogJsonWriterPartitioner</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.linkedin.gobblin</groupId>
<artifactId>gobblin-api</artifactId>
<version>0.6.2</version>
</dependency>
<dependency>
<groupId>com.linkedin.gobblin</groupId>
<artifactId>gobblin-core</artifactId>
<version>0.6.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>

Hope it helps,
Zongjun

er...@relcy.com

unread,
Mar 11, 2016, 7:15:42 PM3/11/16
to gobblin-users
Is this the only way to do this? To me just adding a java code in the original Gobblin package looks simpler. Is there any way I can do it that way?
If you wrote your class in a separate project, where do you run Gobblin?

Zongjun

unread,
Mar 11, 2016, 7:30:37 PM3/11/16
to er...@relcy.com, gobblin-users
Eric, I put the generated external jar under Gobblin/lib directory. I also added it to the LIBJARS in the gobblin-mapred.sh script.  

Theorectically, your way should work. Afterall, you are just creating another concrete implementation, similar to the originally provided TimeBasedAvroWriterPartitioner.

Would like to see you make it work...
Zongjun

--
You received this message because you are subscribed to a topic in the Google Groups "gobblin-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/gobblin-users/BwWFIFzeSw8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to gobblin-user...@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.

er...@relcy.com

unread,
Mar 11, 2016, 8:29:34 PM3/11/16
to gobblin-users, er...@relcy.com
Followed your way and made it work. Thanks!

Zongjun Qi

unread,
Mar 11, 2016, 9:21:21 PM3/11/16
to gobblin-users, er...@relcy.com
Glad it helped! Cheers!
Zongjun

laksh...@tokbox.com

unread,
Mar 23, 2016, 8:23:00 PM3/23/16
to gobblin-users, er...@relcy.com
This was the exact use case I had. Thanks Zongjun. It helped me a lot.

laksh...@tokbox.com

unread,
Mar 25, 2016, 12:09:49 AM3/25/16
to gobblin-users, er...@relcy.com
Hello Zongjun,

I implemented the TimeBasedPartitioner. 

Lets say I run it on a hourly time window. It creates the files hdfs://gobblin/productA/2016/03/25/03. When i run it again after an hour, it should write it the next hourly window hdfs://gobblin/productA/2016/03/25/04.

But it fails with the exception 


Failed to move hdfs://10.0.1.227:8020/gobblin/task-output/productA/2016 to /gobblinlandingzone/productA/2016: dst already exists

org
.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename hdfs://10.0.1.227:8020/gobblin/task-output/productA/2016 to /gobblinlandingzone/productA/2016: dst already exists

 at gobblin
.util.HadoopUtils.renamePath(HadoopUtils.java:169)

 at gobblin
.util.HadoopUtils.renamePath(HadoopUtils.java:152)

 at gobblin
.util.HadoopUtils.movePath(HadoopUtils.java:213)

 at gobblin
.util.ParallelRunner$6.call(ParallelRunner.java:312)

 at gobblin
.util.ParallelRunner$6.call(ParallelRunner.java:305)

 at java
.util.concurrent.FutureTask.run(FutureTask.java:262)

 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java
.lang.Thread.run(Thread.java:745)


It fails at the stage where it detects that year bucket exists. Did you run into similar issues.

My implementation of the LogJsonWriterPartitioner is as below

public class LogJsonWriterPartitioner  extends TimeBasedWriterPartitioner<byte[]> {


 
public LogJsonWriterPartitioner(State state, int numBranches, int branchId) {

 
super(state, numBranches, branchId);

 
// TODO Auto-generated constructor stub

 
}


 

 
@Override

 
public long getRecordTimestamp(byte[] record) {

 
// TODO Auto-generated method stub

 
return System.currentTimeMillis();

 
}


 

}

Zongjun

unread,
Mar 25, 2016, 2:46:49 PM3/25/16
to laksh...@tokbox.com, gobblin-users, er...@relcy.com
Hey,

I am facing the same error too. But I have a feeling (yet to verify) this is a false alarm. I need yet to look into the reason. Worst case we need to look at the source code. Best would be some LinkedIn folks like Ziyang Liu can help. :-)

Thanks,
Zongjun

Lakshmanan Muthuraman

unread,
Mar 25, 2016, 2:49:29 PM3/25/16
to Zongjun, gobblin-users, er...@relcy.com
I fixed it after getting a response from LinkedIn Folks. I had to set the following property
set data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher

Zongjun

unread,
Mar 25, 2016, 5:03:57 PM3/25/16
to Lakshmanan Muthuraman, gobblin-users, er...@relcy.com
Works as charm! I modified mine accordingly and those errors are gone. Thanks for the quick follow up Lakshmanan!

Thanks,
Zongjun

Samir Patni

unread,
Aug 4, 2016, 5:17:21 PM8/4/16
to gobblin-users, laksh...@tokbox.com, er...@relcy.com

Hi Zongjun, 

can you share  the final config... After all the discussion above...  I urgently need it. 

Regards, 
Samir

Samir Patni

unread,
Aug 4, 2016, 5:36:54 PM8/4/16
to gobblin-users, laksh...@tokbox.com, er...@relcy.com

Hey, 

Looks like you have already shared it above. I will follow the same. and get back if I get into issues. 

Regards, 
Samir

Thanks,
Zongjun


Thanks,
Zongjun

Zongjun

My class definition is

Hi Ziyang,
<span style="white-space:p
Reply all
Reply to author
Forward
0 new messages