Building camus with different hadoop versions

953 views
Skip to first unread message

bco...@rallydev.com

unread,
Mar 4, 2013, 10:41:39 PM3/4/13
to camu...@googlegroups.com
We've been trying to figure out a way to build/release multiple builds of camus, one for each version of hadoop we have in use.

Our goal would be no editing of pom files from the source, no changes, no custom branches etc.

Apparently maven has a facility called "classifiers" that will build jars of the form <jar-name>.<version>.classifier.jar e.g. if the hadoop version is 1.0.3 then the camus-api jar would be camus-api-0.1.0-SNAPSHOT-1.0.3.jar

ideally to build this jar you would run: mvn package -Dhadoop.version=1.0.3

However, we've run into a couple of issues with maven and we're looking for some help.

1. maven-shade-plugin does not understand classifiers. This is ok, as the code that camus-example represents would not normally be a sub-project of camus.

2. Setting a parent-pom property, hadoop.version, with a default value does not seem to propagate to the other sub-modules correctly in the transitive dependencies when overridden on the command line.

3. (related to 2) When referencing the property ${hadoop.version} in the dependency section, at install/deploy time the property is not resolved, and library users must supply a value for the property.

I have a branch with the start of this work: https://github.com/RallySoftware/camus/tree/maven-classifiers-for-hadoop-versions
See this commit: https://github.com/RallySoftware/camus/commit/edcf77d702a7c6e99b5ee15bb822a39670c18a5b

Anyone had experience with this problem? Any way to solve for the hadoop version problem?

Thanks
-Bob

Ken Goodhope

unread,
Mar 5, 2013, 3:53:33 PM3/5/13
to bco...@rallydev.com, camu...@googlegroups.com
Hi Bob,

We usually build with one version of hadoop and then when we deploy we use the cluster local version of hadoop at runtime.  As long as the functionality you are using is common across all versions of hadoop, this works.  I think we might have have a distributed cache call that isn't available in 20.x, so we need to get that fixed.

Ken


--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



Felix GV

unread,
Mar 8, 2013, 8:37:25 AM3/8/13
to Ken Goodhope, bco...@rallydev.com, camu...@googlegroups.com
Hi Bob,

I unfortunately don't have answers to the concerns you bring up, but I'd like to chime in nonetheless.

As I pointed out in previous discussions, in order to use Hadoop 2.0, importing hadoop-core was not enough. I needed to import hadoop-common as well in every sub-projects that already had hadoop-core (more specifically, I added it to camus-schema-registry-avro, camus-schema-registry, camus-etl-kafka and to my own version of the camus-example). Now that I think of it, perhaps some of these sub-projects actually need only hadoop-common and don't need hadoop-core anymore in 2.0 (I haven't tested), but in any case, the potentially superfluous dependency is not causing any problem...

It would be nice if there was a generic/proper way of handling this difference (AFAIK, hadoop-common doesn't exist at all pre-2.0), but I'm not a maven expert by any stretch of the imagination, so I don't know what's possible and what isn't, nor what the best practice would be.

It would definitely be really helpful if I could define my own version of the camus-example sub-project externally and simply reference the rest of the camus stuff I need by passing the appropriate parameters for lib versions (hadoop, kafka, avro) and not have to manage different pom files nor merge any branches when camus changes...

--
Felix

saga...@gmail.com

unread,
Mar 26, 2013, 3:27:51 AM3/26/13
to camu...@googlegroups.com, bco...@rallydev.com
Hello Sir/Madam,

I have configured HDFS on my Ubuntu cluster . Now when I try to implement a Map/Reduce program in it, it is not running because the NameNode automatically becomes inactive in a few seconds. I tried searching many things but could not get any useful information for the same. Please help me in this regard.

Regards,
Sagar Thacker
09909039080

Felix GV

unread,
Mar 26, 2013, 7:39:12 AM3/26/13
to saga...@gmail.com, camu...@googlegroups.com
Well...

This doesn't really seem Camus-related, but if you say your NameNode "becomes inactive" (and I'm not exactly sure what you mean by that, but anyway...) then the first thing I would look at would be the NameNode's logs.

--
Félix

francis...@gmail.com

unread,
Aug 12, 2013, 9:43:44 AM8/12/13
to camu...@googlegroups.com, Ken Goodhope, bco...@rallydev.com
Hi Felix,

i have tried to change dependencies to hadoop 2 but i get too many compile errors.

could you please post/public the poms/projects?

thanks

Felix GV

unread,
Aug 12, 2013, 9:58:57 AM8/12/13
to francis...@gmail.com, camu...@googlegroups.com, Ken Goodhope, bco...@rallydev.com

Look at the various mate1-pom.xml files

N.B.: this is a fork of the 0.7 branch, not 0.8

--
Félix

francis...@gmail.com

unread,
Aug 13, 2013, 5:56:03 AM8/13/13
to camu...@googlegroups.com, francis...@gmail.com, Ken Goodhope, bco...@rallydev.com
Thanks Felix,

it helped a lot.

now my topic is omitted because i need a decoder, the dummy one which comes in default camus.properties seems not working.

is there any example decoder for strings generated by de kafka console producer sample script?

Bob Cotton

unread,
Aug 13, 2013, 8:10:55 AM8/13/13
to francis...@gmail.com, camu...@googlegroups.com
This is the Decoder we used for initial testing.

Keep in mind that Camus expects the output of the Decoder to be encoded in Avro. This decoder does that with a very simple Avro schema.

package com.rallydev.camus;

import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
import kafka.message.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.joda.time.DateTime;

import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Properties;

public class StringDecoder extends MessageDecoder<Message, Record> {

    public static final Text YEAR = new Text("year");
    public static final Text MONTH = new Text("month");
    public static final Text DAY = new Text("day");
    public static final Text HOUR = new Text("hour");
    
    Schema schema = Schema.parse("{\"type\": \"record\", " +
            "\"name\": \"StringHolder\", " +
            "\"fields\": " +
            "[{\"name\":\"value\", \"type\": \"string\"}]}");

    @Override
    public void init(Properties props, String topicName) {
        super.init(props, topicName);
    }

    @Override
    public CamusWrapper<Record> decode(Message message) {
        Record record = new GenericData.Record(schema);
        String payload = new String(message.buffer().array());
        record.put("value", payload);


        // This would extract the timestamp from the record
        long timestamp = System.currentTimeMillis();
        DateTime d = new DateTime(timestamp);

        CamusWrapper<Record> wrapper = new CamusWrapper<Record>(record, timestamp);

        // extract values for partitioning
        wrapper.put(YEAR, new IntWritable(d.getYear()));
        wrapper.put(MONTH, new IntWritable(d.getMonthOfYear()));
        wrapper.put(DAY, new IntWritable(d.getDayOfMonth()));
        wrapper.put(HOUR, new IntWritable(d.getHourOfDay()));

        return wrapper;
    }

}

francis...@gmail.com

unread,
Aug 13, 2013, 12:37:37 PM8/13/13
to camu...@googlegroups.com, bco...@rallydev.com
thanks Bob,

in order to compile i needed to change wrapper.put() for wrapper.getRecord().put(), maybe because i'm using hadoop2.

but i'm still gettting

ERROR mapred.EtlInputFormat: We could not construct decoders for the following topics, so they will be discarded: [test]

i have configured this class in camus.message.decoder.class property
but i think it is ignored, cause a i can write whatever as a value and it is not verified

am i missing anything else?

Yan Fang

unread,
Aug 14, 2013, 9:56:00 PM8/14/13
to camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
I got the same ERROR message. Do I miss something too?

francis...@gmail.com

unread,
Sep 19, 2013, 12:56:54 PM9/19/13
to camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
I started from scratch and got bypass that error, but now

it seems like the decoder were not valid, cause i get exception

Caused by: java.lang.ClassCastException: [B cannot be cast to kafka.message.Message
at com.linkedin.camus.example.StringDecoder.decode(StringDecoder.java:19)


the mapreduce camusjob gets finished but no data is written in path


any help?

thanks


francis...@gmail.com

unread,
Oct 3, 2013, 8:15:32 AM10/3/13
to camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
well, it only needed changing Message for byte[] in decoder

i saw that the default kafkaAvroMessageDecoder in branch camus-kafa-0.8 was working with old kafka 0.7 format message (with 5 bytes message header instead of 14 or 18 bytes (depends on using key) from kafka 0.8 message header

i started a branch here https://github.com/franciscoisaac/camus/tree/0.8-hadoop2 , now only working with kafka 0.8 messages with 0 key lenght (ignoring 14 leading bytes) and with the StringDecoder.

Next will be parsing a key value for choosing the matching schema from a registry.

sudheer...@gmail.com

unread,
Oct 8, 2013, 6:35:48 AM10/8/13
to camu...@googlegroups.com, bco...@rallydev.com
hai this is very interesting information and 123Trainings is a Global Interactive Learning company started by proven industry experts with an aim to provide Quality Training in the latest IT Technologies.
123Trainings is offering Corporate online Training services to Major IT giants and to individual students worldwide.
http://123trainings.com/it-hadoop-bigdata-online-training.html

dpra...@gmail.com

unread,
Nov 22, 2013, 9:59:16 AM11/22/13
to camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
I am facing similar issues. I get following exception. Were you able to resolve this issue? Please help.

[CamusJob] - topic=test partition=0 nodeId=3 server service= beginOffset=0 offset=52 server= checksum=2879367161 time=1385073039142
[CamusJob] - java.io.IOException: java.lang.ClassCastException: kafka.message.Message cannot be cast to [B
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:133)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:252)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532)
at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
Caused by: java.lang.ClassCastException: kafka.message.Message cannot be cast to [B
at com.linkedin.camus.etl.kafka.coders.CDRMessageDecoder.decode(CDRMessageDecoder.java:30)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:130)
... 7 more

Gaurav Gupta

unread,
Nov 22, 2013, 1:42:05 PM11/22/13
to dpra...@gmail.com, camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
Hello,

The signature of the "getWrappedRecord" in the EtlRecordReader changed when migrating from Kafka 0.7 to Kafka 0.8.

Master branch (0.7):

--->

 private CamusWrapper getWrappedRecord(String topicName, Message msg) throws IOException {
        CamusWrapper r = null;
        try {
            r = decoder.decode(msg);

<----


Camus-kafka-0.8 branch (0.8):

---> 

private CamusWrapper getWrappedRecord(String topicName, byte[] payload) throws IOException {
        CamusWrapper r = null;
        try {
            r = decoder.decode(payload);
        } catch (Exception e) {
            if (!skipSchemaErrors) {

<-----


You seem to be getting the exception while invoking the "getWrappredRecord".
Can you please confirm if you are using the correct branch?

Thanks,
Gaurav

--

Pramod Deshmukh

unread,
Nov 22, 2013, 3:05:26 PM11/22/13
to Gaurav Gupta, camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
Thanks Gaurav for the response.

My current setup has kafka-0.7.2-incubating-src and it is Camus from master branch. 

Here is the method from EtlRecordReader

=============

    private CamusWrapper getWrappedRecord(String topicName, Message msg) throws IOException {
        CamusWrapper r = null;
        try {
            r = decoder.decode(msg);
        } catch (Exception e) {
            if (!skipSchemaErrors) {
                throw new IOException(e);
            }
        }
        return r;
    }

===============

-Pramod


dpra...@gmail.com

unread,
Nov 25, 2013, 11:12:02 AM11/25/13
to camu...@googlegroups.com, dpra...@gmail.com, bco...@rallydev.com, francis...@gmail.com, ggu...@linkedin.com

I moved to Kafka 0.8 and would like to use Camus. Can you please suggest a camus branch?

Thanks,

Pramod 

dpra...@gmail.com

unread,
Nov 25, 2013, 11:17:00 AM11/25/13
to camu...@googlegroups.com, dpra...@gmail.com, bco...@rallydev.com, francis...@gmail.com, ggu...@linkedin.com
Got it. 

https://github.com/linkedin/camus/tree/camus-kafka-0.8

Please let me know if this is incorrect.

Thanks,

Pramod

Gaurav Gupta

unread,
Nov 25, 2013, 1:25:04 PM11/25/13
to dpra...@gmail.com, camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
Hi Pramod,

That is the right branch.

Pramod Deshmukh

unread,
Nov 25, 2013, 5:21:40 PM11/25/13
to Gaurav Gupta, camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
Thanks so much for helping me out. It works fine.



--
You received this message because you are subscribed to a topic in the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/camus_etl/fE08mlDeqqk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to camus_etl+...@googlegroups.com.

dpra...@gmail.com

unread,
Nov 27, 2013, 10:00:13 AM11/27/13
to camu...@googlegroups.com, Gaurav Gupta, bco...@rallydev.com, francis...@gmail.com
Hello Gaurav,

Thanks so much for helping me out.

I am using kafka-0.8 -> camus-kafka-0.8.

I am able to pound message to Kafka to Camus and from camus I am able to get messages to HDFS. These messages are simple text messages which I have a decoder for Camus.

We have a requirement to produce Avro messages over Kafka and take them all the way to HDFS as Avro. I am facing issue writing avro messages to Kafka. Here is the my producer code. I tried different encoders but Kafka complained with same exception for each of them. 

Can you please help me?


//////////////////////////////////////////////////////////////////
    private void sendCDRAvroMessage() throws IOException {
        User user1 = new User();
        user1.setName("Alyssa");
        user1.setFavoriteNumber(256);

        Properties props = new Properties();
        props.put("zk.connect", zkConnection);
        props.put("metadata.broker.list", brokerList);
//        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
//        props.put("serializer.class", "org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder");
        props.put("serializer.class", "org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder");

//        Producer<String, Message> producer = new Producer<String, Message>(new ProducerConfig(props));
        Producer<Integer, Message> producer = new Producer<Integer, Message>(new ProducerConfig(props));
        //producer.send(new ProducerData<Message, Message>(topic, message));


        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);

        AvroSpecificDatumBackedKafkaEncoder<User> avroSpecificDatumBackedKafkaEncoder = new AvroSpecificDatumBackedKafkaEncoder(User.class);
        AvroReflectDatumBackedKafkaEncoder<User> avroReflectDatumBackedKafkaEncoder = new AvroReflectDatumBackedKafkaEncoder<User>(User.class);

//        Message message = new Message(avroSpecificDatumBackedKafkaEncoder.toBytes(user1, userDatumWriter));
//        Message message = new Message(avroSpecificDatumBackedKafkaEncoder.toBytes(user1));
        Message message = new Message(avroReflectDatumBackedKafkaEncoder.toBytes(user1));
            //userDatumWriter.write(user1, encoder);
            //encoder.flush();
            //out.close();

        //Message message = new Message(out.toByteArray());

//        producer.send(new KeyedMessage<String, Message>(topic, message));
        producer.send(new KeyedMessage<Integer, Message>(topic, null, message));
    }

//////////////////////////////////////////////////////////////////

Exception:

//////////////////////////////////////////////////////////////////
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
java.lang.NoSuchMethodException: org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2810)
at java.lang.Class.getConstructor(Class.java:1718)
at kafka.utils.Utils$.createObject(Utils.scala:458)
at kafka.producer.Producer.<init>(Producer.scala:60)
at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
at com.humedica.swizzle.kafka.producer.AvroProducer.sendCDRAvroMessage(AvroProducer.java:139)
at com.humedica.swizzle.kafka.producer.AvroProducer.main(AvroProducer.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

Process finished with exit code 0
//////////////////////////////////////////////////////////////////

To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to a topic in the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/camus_etl/fE08mlDeqqk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to camus_etl+unsubscribe@googlegroups.com.

Gaurav Gupta

unread,
Nov 27, 2013, 1:33:22 PM11/27/13
to dpra...@gmail.com, camu...@googlegroups.com, bco...@rallydev.com, francis...@gmail.com
Hi Pramod,

As per what I understand, "AvroReflectDatumBackedKafkaEncoder" is your own class and is not packaged as Kafka's open-sourced code.

If that is true, the exception is because it is not able to find the constructor in the class.
Perhaps the default constructor is missing. But that is just what is on the top of my head. 

In any case, since this more about producing data to Kafka, the kafka-mailing-list would be a better place to fire this question to.

Ajay Sharma

unread,
Feb 17, 2016, 2:09:26 AM2/17/16
to Camus - Kafka ETL for Hadoop, bco...@rallydev.com
Hello Pramod,

This camus branch not working.


https://github.com/linkedin/camus/tree/camus-kafka-0.8

could you please suggest the right one.

Thanks
Ajay

Pramod Deshmukh

unread,
Feb 17, 2016, 5:37:28 AM2/17/16
to Ajay Sharma, Camus - Kafka ETL for Hadoop, Bob Cotton
Sorry Ajay,

I haven't used Camus for a while so not sure about working branch. Please share issues you having with this one.

-Pramod

--
You received this message because you are subscribed to a topic in the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/camus_etl/fE08mlDeqqk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages