Camus - where to start?

2,294 views
Skip to first unread message

Abhi Basu

unread,
Oct 23, 2013, 8:11:09 PM10/23/13
to camu...@googlegroups.com
I am very new to Camus and Kafka and my goal is to write kafka messages using Camus to HDFS. I am looking for some sample code to do so. I was expecting a binary version of Camus that I could run (with appropriate parameters) to see how this works end to end before getting into writing code. Any help is appreciated. Thanks!

Abhi Basu

unread,
Oct 23, 2013, 8:12:22 PM10/23/13
to camu...@googlegroups.com
Just wanted to add that I have a single node instance of Kafka 0.8 running and was able to build the Camus project using Maven. What next to realize my goals? 

Andrew Otto

unread,
Oct 24, 2013, 10:23:16 AM10/24/13
to Abhi Basu, camu...@googlegroups.com
Hi Abhi,

That depends on what format your data is in, and how you want to consume it.

Search this google group for answers to your question.  It seems to be a common one this week :)


--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Abhi Basu

unread,
Oct 24, 2013, 12:43:17 PM10/24/13
to camu...@googlegroups.com
Just show me an example sample end to end code - AVRO format is fine. When a new technology/concept like this is introduced, an end-to-end working example code is a must for newcomers to evaluate it.


On Wednesday, October 23, 2013 5:11:09 PM UTC-7, Abhi Basu wrote:

Andrew Otto

unread,
Oct 24, 2013, 1:17:41 PM10/24/13
to Abhi Basu, camu...@googlegroups.com
If you are using Avro, you shouldn't need to edit anything. 

Checkout the camus-example target.  Particularly, camus-example/src/main/resources/camus.properties.  Edit it so that it points at your Kafka and Hadoop clusters.  You can then launch a Camus job by submitting the .jar to hadoop:

  hadoop jar camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus-example/src/main/resources/camus.properties

For timestamp bucketing, KafkaAvroMessageDecoder will look for a field named 'timestamp' or 'time' in your Avro object.





Abhi Basu

unread,
Oct 24, 2013, 1:39:56 PM10/24/13
to camu...@googlegroups.com
Thanks for your help, Andrew. I will be able to work on this today and provide and update.

Abhi


On Wednesday, October 23, 2013 5:11:09 PM UTC-7, Abhi Basu wrote:

Adrian

unread,
Oct 25, 2013, 12:56:00 PM10/25/13
to camu...@googlegroups.com
I have been struggling with this as well.  I eventually plan on using my own decoder and probably my own partitioner, but first I wanted to just get the example to run without errors (if it errors while decoding the message or doesn't have anything to store, so be it, this is just a first attempt).  I had my box rebuilt the other day so it set me back some dealing with some hadoop issues.  Now I have everything running again, but when I contact zookeeper the NodeData I get back can't be parsed into a Uri.  I am not sure what is supposed to be returned but I get:
{ "host":"myBox", "jmx_port":9999, "port":9092, "version":1 }
And in EtlZkClient it says it should look like ip-timestamp:ip:port whereas I obviously have a json object.  I ran trough debugging the code and changed the uri to tcp://myBox:9092 and was able to step through further, but there is obviously something wrong here.
My zookeeper values:
zookeeper.hosts=localhost:2181
zookeeper.broker.topics=/brokers/topics
zookeeper.broker.nodes=/brokers/ids
I am running kafka 8 and using the standard configurations that came with its bundled zookeeper.

Any idea what s going wrong?

Thanks,
Adrian

Adrian

unread,
Oct 25, 2013, 1:00:04 PM10/25/13
to camu...@googlegroups.com
Oh, and one more note, after changing that value I still get an error about my topic
30505 [main] ERROR com.linkedin.camus.etl.kafka.common.EtlZkClient  - Error on Topic: test, Cannot find partitions in /brokers/topics/test/partitions

I only have one topic "test" which I created like so:
kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

Andrew Otto

unread,
Oct 25, 2013, 1:22:27 PM10/25/13
to Adrian, camu...@googlegroups.com
Hmm, for 0.8 you shouldn't set the zookeeper.* values in the properties file.  They are commented out in the example with the comment:

  # Kafka-0.8 handles all zookeeper calls

Not sure if this is the cause of your problem though.  Try it and see?


Adrian

unread,
Oct 25, 2013, 2:30:34 PM10/25/13
to camu...@googlegroups.com, Adrian
Maybe I am missing something, this is what I see as the properties file under master:
Which has:
zookeeper.hosts=
zookeeper.broker.topics=/brokers/topics
zookeeper.broker.nodes=/brokers/ids
not commented out.  Also, how would Camus know where to look if the zookeeper host was not specified?

Gaurav Gupta

unread,
Oct 25, 2013, 2:41:30 PM10/25/13
to Adrian, camu...@googlegroups.com
If you are using Kafka 07, you will have to use the master branch.
For Kafka 08,  please use the camus-kafka-0.8 branch.

If you are using Kafka 08, it exposes API to query the needed metadata and, I guess, handles the zookeeper calls internally.
In that case, you need not specify the zookeeper information in the properties file.

Adrian

unread,
Oct 25, 2013, 4:49:39 PM10/25/13
to camu...@googlegroups.com, Adrian, ggu...@linkedin.com
That makes way more sense. Thanks.

So I can now at least run Camus (let's hear it for progress!) now I have some questions about what I need to change to use it for my ends.  

I already created my own Decoder (extending MessageDecoder) the message I get is a message with a HeapByteBuffer which if I call toString on just returns information on the object 
 (java.lang.String) java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]
Since this is all that the built in Avro decoder does I am somewhat at a loss as to what I am doing wrong.  My producer just puts a json message onto the queue
{"message":"must","@timestamp":"2013-10-25T20:19:53.589Z","@version":"1","type":"myTest","host":"myHost"}
And the above message is what I get when running the standard kafka consumer so it is possible to pull it back off as a String.

I assume I am missing something obvious here...

Also the EtlMultiOutputFormat uses the MultEtlRecordWriter which assumes every message will be some sort of Avro message which is not my case, so what all do I need to change in order to be able to use this with just json/String messages?  I know there has been some talk of using Json messages in the group so would I be better off looking for one of those branches?

Also, I assume that the 

Andrew Otto

unread,
Oct 25, 2013, 5:07:24 PM10/25/13
to Adrian, camu...@googlegroups.com, ggu...@linkedin.com
Adrian,

Here are the relevant properties I'm setting:

# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

# Our timestamps look like 2013-09-20T15:40:17
camus.message.timestamp.format=yyyy-MM-dd'T'HH:mm:ss

# use the dt field
camus.message.timestamp.field=dt

# RawRecordWriterProvider does no reformatting of the records as they come in.
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider

etl.output.record.delimiter=\n

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner



You could probably use the exact same setup and classes that I use.  Both JsonStringMessageDecoder and StringRecordWriterProvider should be available in the camus-kafka-0.8 branch.  You should probably change camus.message.timestamp.field and camus.message.timestamp.format to match your payload.

-Andrew

Adrian

unread,
Oct 28, 2013, 4:15:51 PM10/28/13
to camu...@googlegroups.com, Adrian, ggu...@linkedin.com
Thanks for the advice.  I cloned the kafka-8 branch and started playing with it.  I set the kafka.brokers property so that Camus can correctly see my zookeeper.  However when it came to determine the path to write the job to, I got an exception.  I set 

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=/data/test
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/data/test/execution
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/data/test/hist

but when it tries to get the destination path to log it throws an exception.  I stepped through what was going on and it looks like the job is created by CamusJob.createJob(props) but when I stepped into createJob the configuration already existed so it didn't copy any of the information from props.  This means that all of the hadoop properties are not read into the hadoop job and when the it tries to create a new Path it throws  an IllegalArgumentException because the String for the path is null.  In CamusJob.createJob it looks like it used to set all the properties into the job regardless of if the conf was null or not but that is commented out.  

Am I doing something completely wrong?

Thanks again,
Adrian

Abhi Basu

unread,
Oct 28, 2013, 4:40:48 PM10/28/13
to camu...@googlegroups.com, Adrian, ggu...@linkedin.com
I also realized I was using the wrong branch of code for kafka 0.8 and am using the new branch now. I am also getting the exact com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat.getDestinationPath error as Adrian mentions here. I have verified that my camus.props file has the right settings. The code cannot read from the properties file it seems.

BTW, with the older code line, I was able to make it much further than this. What is the problem with the new branch?

Thanks,

Abhi

Gaurav Gupta

unread,
Oct 28, 2013, 4:49:55 PM10/28/13
to Abhi Basu, camu...@googlegroups.com, Adrian
Adrian/Abhi,

I guess the commented out code seems to be the issue.
I have made the changes. That should fix it.

I had added the code to check the null value for the conf to make it work with Azkaban out of the box.
That might have broken it.

Let me know if the issue still persists.

Gaurav 

Abhi Basu

unread,
Oct 28, 2013, 6:08:46 PM10/28/13
to camu...@googlegroups.com, Abhi Basu, Adrian, ggu...@linkedin.com
Thanks, Gaurav, that fix worked.

Now I see an error that says Unable to pull request from Kafka brokers. What are the kafka specific properties reqd to be set in the props file?

Thanks,

Abhi

Abhi Basu

unread,
Oct 28, 2013, 6:40:30 PM10/28/13
to camu...@googlegroups.com, Abhi Basu, Adrian, ggu...@linkedin.com
Update: got the job to run w/o any errors (after setting the kafka.host.url and kafka.host.port props), but I dont see any output from the topic in whitelist. Here is what I see:

13/10/28 15:33:00 INFO kafka.CamusJob: Dir Destination set to: hdfs://idh251-0/var/log/camus/data
13/10/28 15:33:00 INFO kafka.CamusJob: Previous execution: hdfs://idh251-0/var/log/camus/history/2013-10-28-22-30-03
13/10/28 15:33:00 INFO kafka.CamusJob: New execution temp location: hdfs://idh251-0/var/log/camus/2013-10-28-22-33-00
13/10/28 15:33:01 WARN kafka.CamusJob: The configuration properties kafka.host.url and kafka.host.port are deprecated. Please switch to using kafka.brokers
13/10/28 15:33:01 INFO mapred.EtlInputFormat: Fetching metadata from broker idh251-kafka:9095 with client id camus-abhi1 for 0 topic(s) []
13/10/28 15:33:02 INFO mapred.EtlInputFormat: Discrading topic : intel.test
13/10/28 15:33:02 INFO mapred.EtlInputFormat: Discarding topic (Decoder generation failed) : iot.test
13/10/28 15:33:02 INFO util.NativeCodeLoader: Trying to load the custom-built native-hadoop library...
13/10/28 15:33:02 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/10/28 15:33:02 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
13/10/28 15:33:02 INFO compress.CodecPool: Got brand-new compressor
13/10/28 15:33:02 INFO mapred.EtlInputFormat: previous offset file:hdfs://idh251-0/var/log/camus/history/2013-10-28-22-30-03/offsets-previous
13/10/28 15:33:02 INFO compress.CodecPool: Got brand-new decompressor
13/10/28 15:33:02 INFO mapred.JobClient: Running job: job_201310251711_0005
13/10/28 15:33:03 INFO mapred.JobClient:  map 0% reduce 0%
13/10/28 15:33:07 INFO mapred.JobClient: Job complete: job_201310251711_0005
13/10/28 15:33:07 INFO mapred.JobClient: Counters: 4
13/10/28 15:33:07 INFO mapred.JobClient:   Job Counters 
13/10/28 15:33:07 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=3111
13/10/28 15:33:07 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/10/28 15:33:07 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/10/28 15:33:07 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/10/28 15:33:07 INFO kafka.CamusJob: Group: Job Counters 
13/10/28 15:33:07 INFO kafka.CamusJob: Total time spent by all maps waiting after reserving slots (ms): 0
13/10/28 15:33:07 INFO kafka.CamusJob: Total time spent by all reduces waiting after reserving slots (ms): 0
13/10/28 15:33:07 INFO kafka.CamusJob: SLOTS_MILLIS_MAPS: 3111
13/10/28 15:33:07 INFO kafka.CamusJob: SLOTS_MILLIS_REDUCES: 0
13/10/28 15:33:07 INFO kafka.CamusJob: Job finished
13/10/28 15:33:07 INFO kafka.CamusJob: ***********Timing Report*************
Job time (seconds):
       pre setup    0.0 (0%)
      get splits    0.0 (0%)
      hadoop job    5.0 (71%)
          commit    0.0 (0%)
Total: 0 minutes 7 seconds

Hadoop job task times (seconds):
             min 9223372036854776.0
            mean    NaN
             max    0.0
            skew    NaN/0.0 = NaN

Task wait time (seconds):
             min 9223372036854776.0
            mean    NaN
             max    0.0

Hadoop task breakdown:
           kafka �
          decode �
      map output �
           other �

  Total MB read: 0

Do I need to write my own decoder? All I want to do is to see this end-to-end by having my Kafka messages inserted into HDFS in Avro format.

Thanks,

Abhi

Gaurav Gupta

unread,
Oct 28, 2013, 7:50:40 PM10/28/13
to Abhi Basu, camu...@googlegroups.com, Adrian
Yes. You will have to define your own decoder to make sense of the Kafka messages.
The decoder class is set using the property:
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

Also to get the code working, you will have to find a solution for the code to be able to access the schema of the topics you are pushing in.
Data in Aro format will need a corresponding schema.
We can get the avro schema registry as soon as AVRO-1124 is resolved.

Abhi Basu

unread,
Nov 1, 2013, 12:38:56 PM11/1/13
to camu...@googlegroups.com, Abhi Basu, Adrian, ggu...@linkedin.com
Can I please get a sample that I can pretty much plug and play? Just as a proof point.

Thanks,

Abhi

ngaur...@gmail.com

unread,
Dec 4, 2013, 4:01:30 PM12/4/13
to camu...@googlegroups.com, Abhi Basu, Adrian, ggu...@linkedin.com
Hi Gaurav/Basu,

I kept following the post and did as discussed for Kafka-0.8 and Camus 0.8 branch. But I still see following error messages:

13/12/04 15:46:02 INFO kafka.CamusJob: Dir Destination set to: hdfs://localhost:9000/tmp/camus/log
No previous execution, all topics pulled from earliest available offset
13/12/04 15:46:02 INFO kafka.CamusJob: New execution temp location: hdfs://localhost:9000/tmp/camus/log/data/2013-12-04-20-46-02
13/12/04 15:46:03 ERROR mapred.EtlInputFormat: Unable to pull requests from Kafka brokers. Exiting the program
java.lang.ArrayIndexOutOfBoundsException: 1
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createConsumer(EtlInputFormat.java:119)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getKafkaMetadata(EtlInputFormat.java:95)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(EtlInputFormat.java:227)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1054)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1071)
at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179)

I added following properties in camus.properties file:
#Kafka
kafka.host.url=localhost
kafka.host.port=9092

But I still see same problem. Can you please help or guide me on this?

Thanks,
Gaurav Nigam

ngaur...@gmail.com

unread,
Dec 4, 2013, 4:34:27 PM12/4/13
to camu...@googlegroups.com, Abhi Basu, Adrian, ggu...@linkedin.com, ngaur...@gmail.com
Hi Abhi Basu,

You confused me with the properties you mentioned in the thread above. We need to set kafka.brokers=localhost:9092 in camus.properties rather than kafka.host.url & kafka.host.port.

Setting this worked for camus 0.8 branch.

Anyways this thread was helpful.

Thanks,
Gaurav

Gaurav Gupta

unread,
Dec 4, 2013, 4:35:06 PM12/4/13
to ngaur...@gmail.com, camu...@googlegroups.com, Abhi Basu, Adrian
There was a recent code push made to the codebase to allow the list of
brokers instead of one broker.
Here is the commit that changes this:
https://github.com/linkedin/camus/commit/975496d79beafe9da54365a8ceb9ebefb2
161154

If you look at the change for camus.properties,

# Connection parameters. Usually a VIP
kafka.host.url=
kafka.host.port=


Was changed to,

# Connection parameters.
kafka.brokers=


----------------------------------------

Instead of setting this to:
kafka.host.url=localhost
kafka.host.port=9092

Please use,

Kafka.brokers=localhost:9092

Let me know if the issue still persists.

Thanks,
Gaurav

ngaur...@gmail.com

unread,
Dec 4, 2013, 4:44:32 PM12/4/13
to camu...@googlegroups.com, ngaur...@gmail.com, Abhi Basu, Adrian
Thanks Gaurav. It worked for me.

@Basu: I didn't noticed the change made to code.

ngaur...@gmail.com

unread,
Dec 5, 2013, 1:29:11 PM12/5/13
to camu...@googlegroups.com, ngaur...@gmail.com, Abhi Basu, Adrian
Hi Gaurav,

I was able to get the Camus job running. The issue is that I'm not sure about how to check the success of job. I'm using Camus-Example job and writing string:

{"id": 100, "logTime": 500, "muchoStuff": null}

to Kafka topic, which is for DummyLog.avsc avro schema registered for Camus-Example. My Camus-Example job complete successfully and but I don't see any data written to HDFS. I had configured:
"etl.execution.base.path=/tmp/camus/log/data" and nothing gets written to this hdfs location.

Also, I used following decoder:
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder.

If you can provide a sample Producer and requisite configuration that could work with Camus-Example, that would be a great help.

Thanks,
Gaurav Nigam


On Wednesday, 4 December 2013 16:35:06 UTC-5, ggupta wrote:

ngaur...@gmail.com

unread,
Dec 5, 2013, 1:31:49 PM12/5/13
to camu...@googlegroups.com, ngaur...@gmail.com, Abhi Basu, Adrian
Here the producer code that I'm using:

public class AvroProducer {
private static Producer<Integer, String> producer;
private final Properties props = new Properties();
public AvroProducer() {
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
producer = new Producer<Integer, String>(new ProducerConfig(props));
}
public static void main(String[] args) throws Exception {
Schema schema = new Parser().parse(new File("DummyLog.avsc"));

//Using this schema, let's create some users.
AvroProducer sp = new AvroProducer();
String topic = "kafkatopic3";

GenericRecord user1 = new GenericData.Record(schema);
user1.put("id", 100);
user1.put("logTime", 500);

KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String> (topic, user1.toString());
int i=0;
while (i<100) {
producer.send(data); i++;
}
producer.close();

Zhu Wayne

unread,
Mar 7, 2014, 1:14:08 PM3/7/14
to camu...@googlegroups.com, ngaur...@gmail.com, Abhi Basu, Adrian
I had the same issue that the job ran successfully without any output in HDFS. Could you give me a hit? Thanks.

Zhu Wayne

unread,
Mar 20, 2014, 10:37:02 PM3/20/14
to camu...@googlegroups.com
Resloved. The decoder was the problem.

alvi...@gmail.com

unread,
Oct 9, 2014, 8:33:12 PM10/9/14
to camu...@googlegroups.com
Hi Wayne,

I have the same issue in Camus.
Would you please give me some idea how did you solve this issue?
Thank.

ankit jain

unread,
Feb 27, 2015, 10:44:07 AM2/27/15
to camu...@googlegroups.com, alvi...@gmail.com
Hi Guys,

Thanks for this post, it helped me alot to come to this point as i am new to camus and kafka.
Can you give some idea on how to resolve this error.
I am stuck at following position.

com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.InstantiationException: com.linkedin.camus.example.schemaregistry.DummySchemaRegistry
        at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:29)
        at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:391)
        at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(EtlInputFormat.java:256)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:597)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:614)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:492)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
        at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: com.linkedin.camus.coders.MessageDecoderException: java.lang.InstantiationException: com.linkedin.camus.example.schemaregistry.DummySchemaRegistry
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder.init(KafkaAvroMessageDecoder.java:41)
        at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:25)
        ... 22 more
Caused by: java.lang.InstantiationException: com.linkedin.camus.example.schemaregistry.DummySchemaRegistry
        at java.lang.Class.newInstance(Class.java:359)
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder.init(KafkaAvroMessageDecoder.java:32)
        ... 23 more

After this my job got finished and nothing is written in HDFS.

Reply all
Reply to author
Forward
0 new messages