Please help: almost make camus-example to work,

738 views
Skip to first unread message

Zhu Wayne

unread,
Mar 8, 2014, 12:09:02 AM3/8/14
to camu...@googlegroups.com
Thanks for the help from the group, I finally got camus-example running in Hadoop after changing Hadoop dependencies for CDH 4.3.0 and resolving some source code and AVRO schema definition issue. However, I got JsonParseException from decoding. Is it related to the decoder code? I am not familiar with AVRO.

$ cat ./camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/LatestSchemaKafkaAvroMessageDecoder.java
package com.linkedin.camus.etl.kafka.coders;
..
public class LatestSchemaKafkaAvroMessageDecoder extends KafkaAvroMessageDecoder
{

        @Override
        public CamusWrapper<Record> decode(byte[] payload)
        {
                try
                {
                        GenericDatumReader<Record> reader = new GenericDatumReader<Record>();

                        Schema schema = super.registry.getLatestSchemaByTopic(super.topicName).getSchema();

                        reader.setSchema(schema);

                        return new CamusWrapper<Record>(reader.read(
                    null,
                    decoderFactory.jsonDecoder(
                            schema,
                            new String(
                                    payload,
                                    //Message.payloadOffset(message.magic()),
                                    Message.MagicOffset(),
                                    payload.length - Message.MagicOffset()
                            )
                    )
            ));
 ...
}



$ ./run.sh
[CamusJob] - Dir Destination set to: /user/wzhu00/camus/camus-dest
No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /user/wzhu00/camus/camus-exec/2014-03-08-04-40-34
[JobClient] - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
[EtlInputFormat] - Fetching metadata from broker mykafkahost:9094 with client id camus for 0 topic(s) []
[EtlInputFormat] - Discrading topic : test
[EtlInputFormat] - Discrading topic : DummyLog
[EtlInputFormat] - Discrading topic : othertopic
[EtlInputFormat] - Discrading topic : DummyLog2
[EtlInputFormat] - Discrading topic : avro-test
Register Schema for  topic:DUMMY_LOG schema:{"type":"record","name":"DummyLog","namespace":"com.linkedin.camus.example.records","doc":"Logs for not so important stuff.","fields":[{"name":"id","type":"long","default":0},{"name":"logTime","type":"long","default":0},{"name":"muchoStuff","type":{"type":"map","values":"string"},"default":"null"}]}
Register Schema for  topic:DUMMY_LOG_2 schema:{"type":"record","name":"DummyLog2","namespace":"com.linkedin.camus.example.records","doc":"Logs for really important stuff.","fields":[{"name":"id","type":"long","default":0},{"name":"name","type":"string","default":"null"},{"name":"muchoStuff","type":{"type":"map","values":"string"},"default":"null"}]}
[ZlibFactory] - Successfully loaded & initialized native-zlib library
[CodecPool] - Got brand-new compressor [.deflate]
[EtlInputFormat] - DUMMY_LOG    uri:tcp://dmhadoop301p.prod.ch3.s.com:9093      leader:1        partition:0       offset:0        latest_offset:0
[EtlInputFormat] - DUMMY_LOG    uri:tcp://dmhadoop301p.prod.ch3.s.com:9092      leader:0        partition:1       offset:0        latest_offset:100
[JobClient] - Running job: job_201402070620_8635
[JobClient] -  map 0% reduce 0%
[JobClient] -  map 50% reduce 0%
[JobClient] -  map 100% reduce 0%
[JobClient] - Job complete: job_201402070620_8635
[JobClient] - Counters: 25
[JobClient] -   File System Counters
[JobClient] -     FILE: Number of bytes read=0
[JobClient] -     FILE: Number of bytes written=391022
[JobClient] -     FILE: Number of read operations=0
[JobClient] -     FILE: Number of large read operations=0
[JobClient] -     FILE: Number of write operations=0
[JobClient] -     HDFS: Number of bytes read=246
[JobClient] -     HDFS: Number of bytes written=4682
[JobClient] -     HDFS: Number of read operations=2
[JobClient] -     HDFS: Number of large read operations=0
[JobClient] -     HDFS: Number of write operations=2
[JobClient] -   Job Counters
[JobClient] -     Launched map tasks=2
[JobClient] -     Total time spent by all maps in occupied slots (ms)=62949
[JobClient] -     Total time spent by all reduces in occupied slots (ms)=0
[JobClient] -     Total time spent by all maps waiting after reserving slots (ms)=0
[JobClient] -     Total time spent by all reduces waiting after reserving slots (ms)=0
[JobClient] -   Map-Reduce Framework
[JobClient] -     Map input records=0
[JobClient] -     Map output records=5
[JobClient] -     Input split bytes=246
[JobClient] -     Spilled Records=0
[JobClient] -     CPU time spent (ms)=6290
[JobClient] -     Physical memory (bytes) snapshot=618356736
[JobClient] -     Virtual memory (bytes) snapshot=5245046784
[JobClient] -     Total committed heap usage (bytes)=2022047744
[JobClient] -   total
[JobClient] -     data-read=3168
[JobClient] -     event-count=99
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read:        0
[CamusJob] - FILE: Number of bytes written:     391022
[CamusJob] - FILE: Number of read operations:   0
[CamusJob] - FILE: Number of large read operations:     0
[CamusJob] - FILE: Number of write operations:  0
[CamusJob] - HDFS: Number of bytes read:        246
[CamusJob] - HDFS: Number of bytes written:     4682
[CamusJob] - HDFS: Number of read operations:   2
[CamusJob] - HDFS: Number of large read operations:     0
[CamusJob] - HDFS: Number of write operations:  2
[CamusJob] - Group: Job Counters
[CamusJob] - Launched map tasks:        2
[CamusJob] - Total time spent by all maps in occupied slots (ms):       62949
[CamusJob] - Total time spent by all reduces in occupied slots (ms):    0
[CamusJob] - Total time spent by all maps waiting after reserving slots (ms):   0
[CamusJob] - Total time spent by all reduces waiting after reserving slots (ms):        0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 0
[CamusJob] - Map output records:        5
[CamusJob] - Input split bytes: 246
[CamusJob] - Spilled Records:   0
[CamusJob] - CPU time spent (ms):       6290
[CamusJob] - Physical memory (bytes) snapshot:  618356736
[CamusJob] - Virtual memory (bytes) snapshot:   5245046784
[CamusJob] - Total committed heap usage (bytes):        2022047744
[CamusJob] - Group: total
[CamusJob] - data-read: 3168
[CamusJob] - event-count:       99
[CodecPool] - Got brand-new decompressor [.deflate]
topic=DUMMY_LOG partition=1leaderId=0 server= service= beginOffset=0 offset=1 server= checksum=4019707847 time=1394253646992
java.io.IOException: java.lang.RuntimeException: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:128)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:255)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
        at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.RuntimeException: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
        at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:41)
        at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:12)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:125)
        ... 12 more
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
        at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
        at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
        at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
        at org.codehaus.jackson.impl.ReaderBasedParser._handleUnexpectedValue(ReaderBasedParser.java:630)
        at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:364)
        at org.apache.avro.io.JsonDecoder.configure(JsonDecoder.java:131)
        at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:73)
        at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:81)
        at org.apache.avro.io.DecoderFactory.jsonDecoder(DecoderFactory.java:268)
        at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)
        ... 14 more


Zhu Wayne

unread,
Mar 8, 2014, 12:23:32 AM3/8/14
to camu...@googlegroups.com
Here is my kakfa AVRO producer snippet. I used binary encoder in producer. Is there something wrong with decoder? How do I set binary decoder then?
               GenericRecord datum = new GenericData.Record(schema);
                datum.put("id", (long) i);
                datum.put("logTime", System.currentTimeMillis());
                Map<CharSequence, CharSequence> map = new HashMap<CharSequence, CharSequence>();
                map.put("LName", "Zhu");
                map.put("FName", "Wayne" + i);
                datum.put("muchoStuff", map);


                ByteArrayOutputStream out = new ByteArrayOutputStream();
                DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
                Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
                writer.write(datum, encoder);
                encoder.flush();

Zhu Wayne

unread,
Mar 8, 2014, 10:30:44 AM3/8/14
to camu...@googlegroups.com
Here is what I got from Hadoop log. I am not sure why LatestSchemaKafkaAvroMessageDecode uses jsonDecoder. Since we use AVRO in binary, should we just use binaryDecoder?
code

{"type":"record","name":"DummyLog","namespace":"com.linkedin.camus.example.records","doc":"Logs for not so important stuff.","fields":[{"name":"id","type":"long","default":0},{"name":"logTime","type":"long","default":0},{"name":"muchoStuff","type":{"type":"map","values":"string"},"default":"null"}]} ??Q FName Daniel LName Zhu


On Friday, March 7, 2014 11:09:02 PM UTC-6, Zhu Wayne wrote:

Félix GV

unread,
Mar 8, 2014, 10:49:39 AM3/8/14
to Zhu Wayne, camu...@googlegroups.com
You need to use the same kind of encoder and decoder on both ends. Either all json, or all binary.

For serious production usage, you should use a real (persistent) schema repo with binary encoding. For prototype or dev purposes, you can get by with a static in-memory schema repo with json decoding. If you want to mix and match (for example, in-memory static repo with binary encoding), then you can implement your own by merging the bits of code that are relevant for you from each implementation.

--
Félix


--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Zhu Wayne

unread,
Mar 8, 2014, 11:04:43 AM3/8/14
to camu...@googlegroups.com, Zhu Wayne
Thank Félix for your tips. Here is the mismatch that I don't know how to resolve yet - encoder for Kafka message producer is binary and  LatestSchemaKafkaAvroMessageDecoder in camus-example uses JSON. I do want to use Schema, but DecoderFactory doesn't have a binaryDecoder that takes in a schema as parameter. ( http://avro.apache.org/docs/1.7.3/api/java/org/apache/avro/io/DecoderFactory.html)

Could you elaborate how we can do in Production with AVRO binary encode and decoder? Thanks again for helping out.

Félix GV

unread,
Mar 8, 2014, 11:10:31 AM3/8/14
to Zhu Wayne, camu...@googlegroups.com

Zhu Wayne

unread,
Mar 8, 2014, 11:32:05 AM3/8/14
to camu...@googlegroups.com, Zhu Wayne
Thanks Félix for your tips. I was trying to modify  LatestSchemaKafkaAvroMessageDecoder.java to use binaryEncoder but got hit with EOFException.
Changed to decoderFactory.binaryDecoder(payload, Message.MagicOffset(), payload.length - Message.MagicOffset(), null)
from: decoderFactory.jsonDecoder(schema,new String(payload,

                                    //Message.payloadOffset(message.magic()),
                                    Message.MagicOffset(),
                                    payload.length - Message.MagicOffset()
                            )


My Kafka producer setting:
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
 
By changing decoder with its parent class KafkaAvroMessageDecoder, I made good progress but still got Magic Byte unknown exception.

[CodecPool] - Got brand-new decompressor [.deflate]
topic=DUMMY_LOG partition=1leaderId=2 server= service= beginOffset=0 offset=1 server= checksum=3229255088 time=1394295483343
java.io.IOException: java.lang.IllegalArgumentException: Unknown magic byte!

        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:128)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:255)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)
        at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
        at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.IllegalArgumentException: Unknown magic byte!
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder$MessageDecoderHelper.getByteBuffer(KafkaAvroMessageDecoder.java:89)
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder$MessageDecoderHelper.invoke(KafkaAvroMessageDecoder.java:94)
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder.decode(KafkaAvroMessageDecoder.java:111)
        at com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder.decode(KafkaAvroMessageDecoder.java:22)

        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:125)
        ... 12 more


$ grep Decoder camus-example/src/main/resources/camus.properties
# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

zhuw.chicago

unread,
Mar 8, 2014, 1:58:01 PM3/8/14
to Félix GV, camu...@googlegroups.com
Got it. I will use both classes in the same package to encode and decide.

Tracy Zhang

unread,
Jun 18, 2014, 4:28:30 PM6/18/14
to camu...@googlegroups.com, fel...@gmail.com
Hi Wayne,

What decoder you used at last?

Tracy

Anant

unread,
Aug 5, 2014, 2:11:40 AM8/5/14
to camu...@googlegroups.com, fel...@gmail.com
Hi Tracy/Wayne,

I am also facing the same issue. How did you guys resolve this?

Anant Rustagi
Reply all
Reply to author
Forward
0 new messages