Problem integrating with Schema-Repo

257 views
Skip to first unread message

max2su...@gmail.com

unread,
Mar 3, 2015, 9:01:46 PM3/3/15
to camu...@googlegroups.com
Hi all,

I have browsed through the different conversations around the schema-repo, but haven't been able to find a solution to my problem. I am trying to set up Camus, using a 3 node Kafka cluster 0.8.2.1, using the Avro Schema-Repo. All of the avro schemas for the topics are published correctly. I am building Camus and using:

hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.first=true -P config.properties

As the command to start the job, where I have set up an environment variable that holds all the libjars that the mvn package command generates. 

I have also set the following properties to configure the job:
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder
kafka
.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/


When I execute the job I get an Exception indicating the AvroRestSchemaRegistry class can't be found (I've double checked it's part of the libjars). I wanted to ask if this is the correct way to set up this integration, and if anyone has pointers on why the job is not finding the class AvroRestSchemaRegistry

Thanks in advance for the help!

Follows the complete stack trace:

[CamusJob] - failed to create decoder

com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
       at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:29)

  at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:391)

       at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(EtlInputFormat.java:256)

       at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1107)

       at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124)

 at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)

       at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)

       at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)

       at java.security.AccessController.doPrivileged(Native Method)

       at javax.security.auth.Subject.doAs(Subject.java:415)

       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)

       at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)

       at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)

       at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)

       at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)

       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)

       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)

       at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)

       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.java:57)

       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

       at java.lang.reflect.Method.invoke(Method.java:606)

       at org.apache.hadoop.util.RunJar.main(RunJar.java:212)



Felix GV

unread,
Mar 5, 2015, 8:09:06 PM3/5/15
to max2su...@gmail.com, camu...@googlegroups.com
Are you sure you need -libjars at all? The shaded jar should contain everything you need, I think...


--
 
Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn
 
f...@linkedin.com
linkedin.com/in/felixgv

From: camu...@googlegroups.com [camu...@googlegroups.com] on behalf of max2su...@gmail.com [max2su...@gmail.com]
Sent: Tuesday, March 03, 2015 6:01 PM
To: camu...@googlegroups.com
Subject: Problem integrating with Schema-Repo

--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

max2su...@gmail.com

unread,
Mar 5, 2015, 9:28:50 PM3/5/15
to camu...@googlegroups.com, max2su...@gmail.com
Hi Felix, thanks for your reply! 

I resorted to this, because with the shaded jar was not including by default the camus-schema-registry-avro in the fat jar. Now I've fixed the dependencies in the POM and apparently the job is executing, however I am getting the ab exception after the Map phase for all the topics in Kafka which (apparently) drops all the messages that it reads from the Brokers.

Any help is greatly appreciated. 

Note: as the schema server I am using the schema-repo project hosted in GitHub by schema-repo.org started by you from Avro-1124. I've looked and they are apparently compatible, but I wanted to ask if my assumption is incorrect. 

Thanks!

Max

Exception Follows:

[CamusJob] - job failed: 100.0% messages skipped due to other, maximum allowed is 0.1%

Exception in thread "main" java.lang.RuntimeException: job failed: 100.0% messages skipped due to other, maximum allowed is 0.1%


        at com.linkedin.camus.etl.kafka.CamusJob.checkIfTooManySkippedMsg(CamusJob.java:469)


        at com.linkedin.camus.etl.kafka.CamusJob.checkIfTooManySkippedMsg(CamusJob.java:455)


        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:366)


        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:237)


        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:645)


        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)


        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)


        at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:600)


        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

Felix GV

unread,
Mar 6, 2015, 2:00:36 PM3/6/15
to max2su...@gmail.com, camu...@googlegroups.com, Henry Cai
The intent is that the schema-repo.org server remain compatible with all (old and new) clients. If that is not the case, that would be a bug which you could report. At this point, however, based on your stacktrace, I don't think there is a clear indication that the problem is related to the schema-repo. It may be related to how the messages are produced or how Camus is configured...

I'm not familiar with the error message you posted. I haven't looked into Camus for a while so I don't know if this is an error message that has been added recently or if I just never hit that issue before.

Hopefully there are other people on the Camus list who can comment on your issue.

Also, after the work I've done in the schema-repo project, Confluent have now also released their own implementation of a schema registry which works with their own version of Camus. If you're just starting out and do not already have any data in your Kafka queues, you still have the choice of using one or the other. You may want to check out and compare these two options.


--
 
Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn
 
f...@linkedin.com
linkedin.com/in/felixgv
Sent: Thursday, March 05, 2015 6:28 PM
To: camu...@googlegroups.com
Cc: max2su...@gmail.com
Subject: Re: Problem integrating with Schema-Repo

Felix GV

unread,
Mar 6, 2015, 6:54:43 PM3/6/15
to Henry Cai, max2su...@gmail.com, camu...@googlegroups.com
Ok so it does seem like a schema-repo related issue then. Thanks Henry.

Max, are you sure your schema is properly registered in the repo? Can you curl your schema out of it properly?



--
 
Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn
 
f...@linkedin.com
linkedin.com/in/felixgv

From: Henry Cai
Sent: Friday, March 06, 2015 2:34 PM
To: Felix GV; max2su...@gmail.com; camu...@googlegroups.com
Subject: RE: Problem integrating with Schema-Repo

That particular error message means you have too many messages's schema cannot be resolved.

You can control how many message you can skip by controlling parameter: etl.max.percent.skipped.schemanotfound


From: Felix GV
Sent: Friday, March 06, 2015 11:00 AM
To: max2su...@gmail.com; camu...@googlegroups.com
Cc: Henry Cai
Subject: RE: Problem integrating with Schema-Repo

max2su...@gmail.com

unread,
Mar 6, 2015, 8:21:15 PM3/6/15
to camu...@googlegroups.com, hc...@linkedin.com, max2su...@gmail.com
HI all,

thanks for all the help and advice, I'll look into it, but we currently use the schema-repo to create messages from different applications and it seems to be working fine. Will try to tweak the parameter as well. 

Again thanks for the help!

Max

Felix GV

unread,
Mar 6, 2015, 8:31:34 PM3/6/15
to max2su...@gmail.com, camu...@googlegroups.com, Henry Cai
Ok...

Please keep us posted (: !


--
 
Felix GV
Data Infrastructure Engineer
Distributed Data Systems
LinkedIn
 
f...@linkedin.com
linkedin.com/in/felixgv
Sent: Friday, March 06, 2015 5:21 PM
To: camu...@googlegroups.com
Cc: Henry Cai; max2su...@gmail.com

max square

unread,
Mar 20, 2015, 1:27:16 PM3/20/15
to Felix GV, camu...@googlegroups.com, Henry Cai
Hey all!

Sorry for the radio silence. I wasn't able to get Camus working with my configuration, so I pulled Camus-like environment using the schema repo and the flume-kafka channels, and the schema-repo. There are some tradeoffs, but it's working. 

Thanks for all the help!

I might give it another shot on a rainy Sunday afternoon :)

Max
Reply all
Reply to author
Forward
0 new messages