thanks to provided this tool for pipe data from kafka to hdfs, i try to use like this command,:
hadoop jar camus-etl-kafka/target/camus-etl-kafka-0.1.0-SNAPSHOT.jar com.linkedin.camus.etl.kafka.CamusJob -D etl.destination.path=/logdata/uigs_kafka_test -D etl.execution.base.path=/user/wws/kafka -D etl.execution.history.path=/user/wws/kafka/history -D zookeeper.hosts=10.10.10.11 -D zookeeper.broker.topics=/kafka-uigs/brokers/topics -D zookeeper.broker.nodes=/kafka-uigs/brokers/ids -D mapred.map.tasks=30 -D kafka.max.pull.hrs=1 -D kafka.max.historical.days=3 -D kafka.max.pull.minutes.per.task=-1 -D
kafka.client.name=camus-uigs -D kafka.host.url=10.10.10.57 -D kafka.host.port=9092 -D kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry -D camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder -D etl.default.timezone=Asia/Shanghai -D libjars.priority.higher.than.jobclasses=true -libjars lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.jar,camus-schema-registry/target/camus-schema-registry-0.1.0-SNAPSHOT.jar,camus-example/target/camus-example-0.1.0-SNAPSHOT.jar,lib/avro-mapred-1.7.3.jar,lib/avro-1.7.3.jar,lib/metrics-core-2.1.2.jar,lib/scala-library-2.8.0.jar,lib/joda-time-1.6.jar,lib/avro-repo-bundle-1.7.4-SNAPSHOT-withdeps.jar,lib/kafka-0.8-SNAPSHOT.jar,camus-schema-registry-avro/target/camus-schema-registry-avro-0.1.0-SNAPSHOT.jar,camus-etl-kafka/target/camus-etl-kafka-0.1.0-SNAPSHOT.jar,camus-api/target/camus-api-0.1.0-SNAPSHOT.jar,camus-schema-registry/target/camus-schema-registry-0.1.0-SNAPSHOT.jar
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Field id type:LONG pos:0 not set and has no default value
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:38)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:337)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(EtlInputFormat.java:221)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:954)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:971)
at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:884)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:837)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1340)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:837)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:476)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:253)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:607)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:545)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:200)
i have no configure file, just use -D property=value
i try it for more than one day and have not idea where i made mistake. please help me, thanks.