Need help on Fork Operator in Gobblin

88 views
Skip to first unread message

nazim....@globallogic.com

unread,
Feb 22, 2017, 8:16:45 AM2/22/17
to gobblin-users, Sayyad Ghazi, Chitresh Katarpawar, Gaya Prasad Mourya
Hi,

I have a requirement for publishing data from Gobblin to multiple systems. I am reading string data from Kafka. What I am trying is

Kafka --> Gobblin -->1.  HDFS
                               2. Local File System
                              3. Another business component


For now. I am just trying to publish data to 2 different HDFS locations. I am using Fork operator of Gobblin. Please find below the .pull file which I am running in standalone mode.

versions used : Hadoop 2,7, Kafka 0.9, Gobblin 0.9


job.name=fork5
job.group=fork5
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

topic.whitelist=test

kafka.brokers=localhost:9092

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
fork.branches=2
fork.operator.class=gobblin.fork.IdentityForkOperator

fs.uri=hdfs://0.0.0.0:9000
state.store.fs.uri=hdfs://0.0.0.0:9000

writer.fs.uri.0=hdfs://0.0.0.0:9000
writer.destination.type.0=HDFS
writer.output.format.0=txt
writer.staging.dir.0=/gobblin/example/task-staging/purged
writer.output.dir.0=/gobblin/example/task-output/purged
writer.builder.class.0=gobblin.writer.SimpleDataWriterBuilder
data.publisher.final.dir.0=/gobblin/example/job-output/purged

writer.fs.uri.1=hdfs://0.0.0.0:9000
writer.destination.type.1=HDFS
writer.output.format.1=txt
writer.staging.dir.1=/gobblin/example/task-staging/normal
writer.output.dir.1=/gobblin/example/task-output/normal
writer.builder.class.1=gobblin.writer.SimpleDataWriterBuilder
data.publisher.final.dir.1=/gobblin/example/job-output/normal


state.store.dir=/gobblin-kafka/state-store

writer.file.path.type=tablename

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest


mr.job.root.dir=/gobblin-kafka/working

task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data



 I am getting below exception


java.lang.IllegalArgumentException: Missing required property writer.staging.dir.0
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
        at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:68)
        at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:216)
        at gobblin.runtime.AbstractJobLauncher.cleanLeftoverStagingData(AbstractJobLauncher.java:729)
        at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:365)
        at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:375)
        at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:333)
        at gobblin.scheduler.JobScheduler$NonScheduledJobRunner.run(JobScheduler.java:497)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.runtime.AbstractJobLauncher  373 - Starting job job_fork6_1487767386843
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.util.ExecutorsUtils  130 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@a0e524b[Shutting down, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.util.ExecutorsUtils  149 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@a0e524b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.runtime.TaskExecutor  159 - Executing task task_fork6_1487767386843_0
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.runtime.GobblinMultiTaskAttempt  130 - Waiting for submitted tasks of job job_fork6_1487767386843 to complete in container ...
2017-02-22 12:43:10 UTC INFO  [JobScheduler-0] gobblin.runtime.GobblinMultiTaskAttempt  133 - 1 out of 1 tasks of job job_fork6_1487767386843 are running in container
2017-02-22 12:43:10 UTC ERROR [TaskExecutor-0] gobblin.runtime.Task  393 - Task task_fork6_1487767386843_0 failed
gobblin.fork.CopyNotSupportedException: test is not copyable
        at gobblin.runtime.Task.run(Task.java:260)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO  [TaskExecutor-0] gobblin.runtime.TaskExecutor  212 - Scheduled retry of failed task task_fork6_1487767386843_0 to run in 0 seconds
2017-02-22 12:43:10 UTC ERROR [TaskRetryExecutor-0] gobblin.runtime.Task  393 - Task task_fork6_1487767386843_0 failed
gobblin.fork.CopyNotSupportedException: test is not copyable
        at gobblin.runtime.Task.run(Task.java:260)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2017-02-22 12:43:10 UTC INFO  [TaskRetryExecutor-0] gobblin.runtime.TaskExecutor  212 

Shirshanka Das

unread,
Feb 26, 2017, 11:20:15 PM2/26/17
to nazim....@globallogic.com, gobblin-users, Sayyad Ghazi, Chitresh Katarpawar, Gaya Prasad Mourya
Hi, 

  Forks require the ability to copy records that are flowing through the pipeline. The KafkaSimpleSource produces byte[] which are not natively copyable, leading to the underlying exception. 
  Task task_fork6_1487767386843_0 failed
gobblin.fork.CopyNotSupportedException: test is not copyable
        at gobblin.runtime.Task.run(Task.java:260)


May I ask what the underlying serialization format is? If this is avro, there is a converter that can convert avro records into copyable forms which would then work with a fork-based pipeline. 



  

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/a87bdf77-5c18-46b3-9f21-45c52e7cf976%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

sayyad...@globallogic.com

unread,
Feb 27, 2017, 2:27:03 AM2/27/17
to gobblin-users, nazim....@globallogic.com, sayyad...@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com

Hi Shrishanka ,

Thanks for the reply.
I have not specified any specific serialization format , I was running a simple kafka console producer which sends a strings as to a kafka topic "test" . you can have a look at the property file I am using .
Please specify what do I need to change in my property file  or put in extra code ?

Shirshanka Das

unread,
Feb 28, 2017, 2:34:27 AM2/28/17
to sayyad...@globallogic.com, gobblin-users, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
Hi,
   Your current pipeline with byte[] based records won't work out of the box with forks. Are you building gobblin from source? In that case, we can make the necessary code changes this week and you can pick it up. 

Shirshanka


Sayyad Ghazi

unread,
Feb 28, 2017, 2:43:17 AM2/28/17
to gobblin-users, sayyad...@globallogic.com, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
Hi,

Yes Shirshanka ,I am building gobblin from source ,and we want out of the box support for string processing for forks, If you can make the code changes for it , that would help a great deal if it could work with existing configuration file.
Please do provide a sample config file for the same if you could.

Sayyad Ghazi

Shirshanka Das

unread,
Mar 2, 2017, 5:57:26 PM3/2/17
to Sayyad Ghazi, gobblin-users, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
We should have it this week. I'll let you know when it has been merged in. 
Your existing config file should work. 

Shirshanka


To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.

Shirshanka Das

unread,
Mar 6, 2017, 2:30:25 AM3/6/17
to gobblin-users, sayyad...@globallogic.com, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com

Your existing config file should work.

I would recommend adding a newline separator for the simple data writer, so that your test records are written to separate lines in the file. 

Add the following to your config file:  no quotes or extra backslashes needed

simple.writer.delimiter=\n


HTH
Shirshanka

Sayyad Ghazi

unread,
Mar 6, 2017, 2:46:29 AM3/6/17
to gobblin-users, sayyad...@globallogic.com, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
Hi Shirshanka,

I was monitoring the commit ,building the source with ./gradlew -x test clean build -PhadoopVersion=2.6.0-cdh5.8.0 ,will add the given property and update you if its working.

Thanks for support.

Regards,
Sayyed Ghazi

Sayyad Ghazi

unread,
Mar 6, 2017, 6:32:01 AM3/6/17
to gobblin-users, sayyad...@globallogic.com, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
Its working as intended.

Thanks a lot for the great help and support . Will trouble you If there I we get stuck anywhere else :)

Thanks & Regards
Sayyed Ghazi

Shirshanka Das

unread,
Mar 6, 2017, 10:48:26 AM3/6/17
to Sayyad Ghazi, gobblin-users, nazim....@globallogic.com, chitresh....@globallogic.com, gaya....@globallogic.com
Great to hear!

Shirshanka

Reply all
Reply to author
Forward
0 new messages