Thanks for the help from the group, I finally got camus-example running in Hadoop after changing Hadoop dependencies for CDH 4.3.0 and resolving some source code and AVRO schema definition issue. However, I got JsonParseException from decoding. Is it related to the decoder code? I am not familiar with AVRO.
$ cat ./camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/coders/LatestSchemaKafkaAvroMessageDecoder.java
package com.linkedin.camus.etl.kafka.coders;
..
public class LatestSchemaKafkaAvroMessageDecoder extends KafkaAvroMessageDecoder
{
@Override
public CamusWrapper<Record> decode(byte[] payload)
{
try
{
GenericDatumReader<Record> reader = new GenericDatumReader<Record>();
Schema schema = super.registry.getLatestSchemaByTopic(super.topicName).getSchema();
reader.setSchema(schema);
return new CamusWrapper<Record>(reader.read(
null,
decoderFactory.jsonDecoder(
schema,
new String(
payload,
//Message.payloadOffset(message.magic()),
Message.MagicOffset(),
payload.length - Message.MagicOffset()
)
)
));
...
}
$ ./run.sh
[CamusJob] - Dir Destination set to: /user/wzhu00/camus/camus-dest
No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /user/wzhu00/camus/camus-exec/2014-03-08-04-40-34
[JobClient] - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
[EtlInputFormat] - Fetching metadata from broker mykafkahost:9094 with client id camus for 0 topic(s) []
[EtlInputFormat] - Discrading topic : test
[EtlInputFormat] - Discrading topic : DummyLog
[EtlInputFormat] - Discrading topic : othertopic
[EtlInputFormat] - Discrading topic : DummyLog2
[EtlInputFormat] - Discrading topic : avro-test
Register Schema for topic:DUMMY_LOG schema:{"type":"record","name":"DummyLog","namespace":"com.linkedin.camus.example.records","doc":"Logs for not so important stuff.","fields":[{"name":"id","type":"long","default":0},{"name":"logTime","type":"long","default":0},{"name":"muchoStuff","type":{"type":"map","values":"string"},"default":"null"}]}
Register Schema for topic:DUMMY_LOG_2 schema:{"type":"record","name":"DummyLog2","namespace":"com.linkedin.camus.example.records","doc":"Logs for really important stuff.","fields":[{"name":"id","type":"long","default":0},{"name":"name","type":"string","default":"null"},{"name":"muchoStuff","type":{"type":"map","values":"string"},"default":"null"}]}
[ZlibFactory] - Successfully loaded & initialized native-zlib library
[CodecPool] - Got brand-new compressor [.deflate]
[EtlInputFormat] - DUMMY_LOG uri:tcp://
dmhadoop301p.prod.ch3.s.com:9093 leader:1 partition:0 offset:0 latest_offset:0
[EtlInputFormat] - DUMMY_LOG uri:tcp://
dmhadoop301p.prod.ch3.s.com:9092 leader:0 partition:1 offset:0 latest_offset:100
[JobClient] - Running job: job_201402070620_8635
[JobClient] - map 0% reduce 0%
[JobClient] - map 50% reduce 0%
[JobClient] - map 100% reduce 0%
[JobClient] - Job complete: job_201402070620_8635
[JobClient] - Counters: 25
[JobClient] - File System Counters
[JobClient] - FILE: Number of bytes read=0
[JobClient] - FILE: Number of bytes written=391022
[JobClient] - FILE: Number of read operations=0
[JobClient] - FILE: Number of large read operations=0
[JobClient] - FILE: Number of write operations=0
[JobClient] - HDFS: Number of bytes read=246
[JobClient] - HDFS: Number of bytes written=4682
[JobClient] - HDFS: Number of read operations=2
[JobClient] - HDFS: Number of large read operations=0
[JobClient] - HDFS: Number of write operations=2
[JobClient] - Job Counters
[JobClient] - Launched map tasks=2
[JobClient] - Total time spent by all maps in occupied slots (ms)=62949
[JobClient] - Total time spent by all reduces in occupied slots (ms)=0
[JobClient] - Total time spent by all maps waiting after reserving slots (ms)=0
[JobClient] - Total time spent by all reduces waiting after reserving slots (ms)=0
[JobClient] - Map-Reduce Framework
[JobClient] - Map input records=0
[JobClient] - Map output records=5
[JobClient] - Input split bytes=246
[JobClient] - Spilled Records=0
[JobClient] - CPU time spent (ms)=6290
[JobClient] - Physical memory (bytes) snapshot=618356736
[JobClient] - Virtual memory (bytes) snapshot=
5245046784[JobClient] - Total committed heap usage (bytes)=
2022047744[JobClient] - total
[JobClient] - data-read=3168
[JobClient] - event-count=99
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read: 0
[CamusJob] - FILE: Number of bytes written: 391022
[CamusJob] - FILE: Number of read operations: 0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations: 0
[CamusJob] - HDFS: Number of bytes read: 246
[CamusJob] - HDFS: Number of bytes written: 4682
[CamusJob] - HDFS: Number of read operations: 2
[CamusJob] - HDFS: Number of large read operations: 0
[CamusJob] - HDFS: Number of write operations: 2
[CamusJob] - Group: Job Counters
[CamusJob] - Launched map tasks: 2
[CamusJob] - Total time spent by all maps in occupied slots (ms): 62949
[CamusJob] - Total time spent by all reduces in occupied slots (ms): 0
[CamusJob] - Total time spent by all maps waiting after reserving slots (ms): 0
[CamusJob] - Total time spent by all reduces waiting after reserving slots (ms): 0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 0
[CamusJob] - Map output records: 5
[CamusJob] - Input split bytes: 246
[CamusJob] - Spilled Records: 0
[CamusJob] - CPU time spent (ms): 6290
[CamusJob] - Physical memory (bytes) snapshot: 618356736
[CamusJob] - Virtual memory (bytes) snapshot:
5245046784[CamusJob] - Total committed heap usage (bytes):
2022047744[CamusJob] - Group: total
[CamusJob] - data-read: 3168
[CamusJob] - event-count: 99
[CodecPool] - Got brand-new decompressor [.deflate]
topic=DUMMY_LOG partition=1leaderId=0 server= service= beginOffset=0 offset=1 server= checksum=
4019707847 time=1394253646992
java.io.IOException: java.lang.RuntimeException: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:128)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:255)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:76)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:85)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:139)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.RuntimeException: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:41)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:12)
at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:125)
... 12 more
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('�' (code 65533 / 0xfffd)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@1a8fa0d1; line: 1, column: 2]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
at org.codehaus.jackson.impl.ReaderBasedParser._handleUnexpectedValue(ReaderBasedParser.java:630)
at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:364)
at org.apache.avro.io.JsonDecoder.configure(JsonDecoder.java:131)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:73)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:81)
at org.apache.avro.io.DecoderFactory.jsonDecoder(DecoderFactory.java:268)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)
... 14 more