Gobblin with Kafka as source

1,049 views
Skip to first unread message

Randy DeLeon

unread,
Jun 2, 2015, 10:50:56 AM6/2/15
to gobbli...@googlegroups.com
Hi,

I noticed an earlier thread about Gobblin/Kafka integration. That thread ended with "it's coming", in short. I've pulled the latest code-base for Gobblin and noticed that Kafka classes exist in the extract package for the gobblin-core module. Yet, when I look for examples around this - I find absolutely non. As a result, I've dug through the code and found that KafkaSource class requires some parameters. So I went ahead and tried to create a Kafka.pull file and see.

I've set up a local 3 node Kafka cluster (1 zookeeper node and 2 kafka nodes). When I feed the following pull file, I get an exception from Gobblin that I'm not quite sure how to decipher yet. (Again I just picked this up and am tinkering with it.)

Example Pull File:

# Job specific properties
job.name=PullFromKafka
job.group=Kafka
job.description=Pull content from a Kafka queue
job.lock.dir=/DevelopmentUtils/gobblin-dist/job_locks/
job.lock.enabled=true
job.runonce=true

# Job type properties
launcher.type=LOCAL

# Source properties
source.class=gobblin.source.extractor.extract.kafka.KafkaSource
topic.blacklist=
topic.whitelist=test
topics.move.to.latest.offset=
move.to.earliest.offset.allowed=
leader.hostandport=localhost:2181
avg.event.size=
estimated.data.size=

# Extract properties
extract.namespace=gobblin.source.extractor.extract.kafka

# Writer properties
writer.destination.type=HDFS
writer.output.format=AVRO

# Data publisher properties
data.publisher.type=gobblin.publisher.BaseDataPublisher

I'm more than sure some of these items may be incorrect, but the error I'm seeing that I'm unsure of is:

2015-06-02 09:47:37 CDT ERROR [JobScheduler-0] gobblin.scheduler.JobScheduler  275 - Failed to launch and run job PullFromKafka
java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at gobblin.runtime.JobContext.<init>(JobContext.java:105)
at gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:104)
at gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:66)
at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:55)
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:267)
at gobblin.scheduler.JobScheduler$NonScheduledJobRunner.run(JobScheduler.java:475)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-06-02 09:47:37 CDT ERROR [JobScheduler-0] gobblin.scheduler.JobScheduler$NonScheduledJobRunner  477 - Failed to run job PullFromKafka
gobblin.runtime.JobException: Failed to launch and run job PullFromKafka
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:276)
at gobblin.scheduler.JobScheduler$NonScheduledJobRunner.run(JobScheduler.java:475)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at gobblin.runtime.JobContext.<init>(JobContext.java:105)
at gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:104)
at gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:66)
at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:55)
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:267)
... 6 more

Am I going the wrong way, and should I be building out my own Kafka Source/Extractor/Converter as done with the Wikipedia and SimpleJSON examples? That seems odd since the classes seem already written, but at this point any insight is appreciated.

I'll continue to look into the code. Documentation around examples is quite limited. I hope someone can point me in the right direction here.

Appreciate your time in reading this!

-Randy

Lin Qiao

unread,
Jun 2, 2015, 11:47:15 AM6/2/15
to Randy DeLeon, gobbli...@googlegroups.com
Hi Randy,

Our Kafka adaptor is code complete, and under a series of testing and dark launch now. We will put up a config recipe for it once its GAed. 

Meanwhile, you can work with Gobblin dev team to set it up on your side.

Lin
--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/ce32624e-9e25-429d-bf1a-e6593546d480%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Randy DeLeon

unread,
Jun 2, 2015, 12:13:10 PM6/2/15
to gobbli...@googlegroups.com, rdele...@gmail.com
Hi Lin,

Appreciate your response! What's the best way to work with the Gobblin dev team? I was under the presumption this place would be it, but if it's somewhere else I'm not sure where to ask.

Thanks,
-Randy

Ziyang Liu

unread,
Jun 2, 2015, 12:36:02 PM6/2/15
to gobbli...@googlegroups.com
Hi Randy, KafkaSource is an abstract class. You should extend it and implement getExtractor(). 
We currently have KafkaAvroSource and KafkaAvroExtractor for Avro data. If your data is Avro you can use them directly, otherwise you need to implement a Source and an Extractor class.

Thanks
Ziyang

aks...@nerdwallet.com

unread,
Jun 3, 2015, 12:56:45 PM6/3/15
to gobbli...@googlegroups.com
There is also gobblin.source.extractor.extract.kafka.KafkaSimpleExtractor and gobblin.source.extractor.extract.kafka.KafkaSimpleSource if your data is not Avro. This just extracts your data as an array of bytes.

Randy DeLeon

unread,
Jun 3, 2015, 3:45:39 PM6/3/15
to gobbli...@googlegroups.com
I've actually been trying to use KafkaSimpleExtractor. My earlier email called out KafkaSource, which Ziyang was nice at stating it was abstract. In short, I tried using KafkaSimpleSource since I wasn't using Avro initially. I'm still trying to figure out the PULL file structure properly. It appears that with KafkaSimpleExtractor the topic name is given as the schema. The program blows up when trying to extract the schema from within MultiConverter.java context.

If there is any sample PULL file structure I can look at, I'd appreciate it. Else I'm trying to figure out the right path to get a simple consumption process working.

I appreciate all replies thus-far, you all have been wonderful at replying. Thanks!

-Randy 

aks...@nerdwallet.com

unread,
Jun 9, 2015, 4:17:28 PM6/9/15
to gobbli...@googlegroups.com
I don't think there are any out of the box converters which take a byte[] as input (except for IdentityConverter, which does nothing). There is also no writer to write a byte[] but you can look here for an example of one: https://github.com/linkedin/gobblin/pull/123

Here is an example job file (there is a decent amount of stuff you'll need to fill in, but it should give you the general idea). Note that the writer is not yet pushed upstream, but its the only writer I know of that can write a byte[]:

job.name=test

job.group=test

job.description=test

job.disabled=false


source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource

converter.classes=gobblin.converter.IdentityConverter

extract.namespace=test

topic.whitelist=<optional>

topic.blacklist=<optional>

kafka.brokers=<kafka-brokers>


writer.fs.uri=file://localhost/

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder

simple.writer.prepend.size=<prepend size>

simple.writer.delimiter=<delimiter>


data.publisher.type=<publisher>

Reply all
Reply to author
Forward
0 new messages