I have a requirement for publishing data from Gobblin to multiple systems. I am reading string data from Kafka. What I am trying is
Kafka --> Gobblin -->1. HDFS
2. Local File System
3. Another business component
For now. I am just trying to publish data to 2 different HDFS locations. I am using Fork operator of Gobblin. Please find below the .pull file which I am running in standalone mode.
job.group=fork5
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
topic.whitelist=test
kafka.brokers=localhost:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
fork.branches=2
fork.operator.class=gobblin.fork.IdentityForkOperator
writer.destination.type.0=HDFS
writer.output.format.0=txt
writer.staging.dir.0=/gobblin/example/task-staging/purged
writer.output.dir.0=/gobblin/example/task-output/purged
writer.builder.class.0=gobblin.writer.SimpleDataWriterBuilder
data.publisher.final.dir.0=/gobblin/example/job-output/purged
writer.destination.type.1=HDFS
writer.output.format.1=txt
writer.staging.dir.1=/gobblin/example/task-staging/normal
writer.output.dir.1=/gobblin/example/task-output/normal
writer.builder.class.1=gobblin.writer.SimpleDataWriterBuilder
data.publisher.final.dir.1=/gobblin/example/job-output/normal
state.store.dir=/gobblin-kafka/state-store
writer.file.path.type=tablename
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
mr.job.root.dir=/gobblin-kafka/working
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
java.lang.IllegalArgumentException: Missing required property writer.staging.dir.0
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:68)
at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:216)
at gobblin.runtime.AbstractJobLauncher.cleanLeftoverStagingData(AbstractJobLauncher.java:729)
at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:365)
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:375)
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:333)
at gobblin.scheduler.JobScheduler$NonScheduledJobRunner.run(JobScheduler.java:497)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.runtime.AbstractJobLauncher 373 - Starting job job_fork6_1487767386843
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.util.ExecutorsUtils 130 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@a0e524b[Shutting down, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.util.ExecutorsUtils 149 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@a0e524b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.runtime.TaskExecutor 159 - Executing task task_fork6_1487767386843_0
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.runtime.GobblinMultiTaskAttempt 130 - Waiting for submitted tasks of job job_fork6_1487767386843 to complete in container ...
2017-02-22 12:43:10 UTC INFO [JobScheduler-0] gobblin.runtime.GobblinMultiTaskAttempt 133 - 1 out of 1 tasks of job job_fork6_1487767386843 are running in container
2017-02-22 12:43:10 UTC ERROR [TaskExecutor-0] gobblin.runtime.Task 393 - Task task_fork6_1487767386843_0 failed
gobblin.fork.CopyNotSupportedException: test is not copyable
at gobblin.runtime.Task.run(Task.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO [TaskExecutor-0] gobblin.runtime.TaskExecutor 212 - Scheduled retry of failed task task_fork6_1487767386843_0 to run in 0 seconds
2017-02-22 12:43:10 UTC ERROR [TaskRetryExecutor-0] gobblin.runtime.Task 393 - Task task_fork6_1487767386843_0 failed
gobblin.fork.CopyNotSupportedException: test is not copyable
at gobblin.runtime.Task.run(Task.java:260)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO [TaskRetryExecutor-0] gobblin.runtime.TaskExecutor 212