I wanted to change the logic on the writer so that it writes to other destinations (like opentsdb or send a snmp trap).
I made a change to point to a different code
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringOpentsdbRecordWriterProvider
However, I dont see that this is being called atall.
It seem to work with
https://github.com/Produban/camus
but not with
https://github.com/linkedin/camus
Any pointers?
- Shekar
java -cp camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar:/etc/hadoop/conf:/usr/lib/hadoop-hdfs/hadoop-hdfs.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.1.txt
I see that the class is referenced in the jar
[ctippur@pppdc9prd2yu kafka-camus-monitoring]$ jar tvf camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar|grep -i opentsdb
8514 Mon Jul 21 13:37:52 PDT 2014 com/linkedin/camus/etl/kafka/common/StringOpentsdbRecordWriterProvider$1.class
4322 Mon Jul 21 13:37:52 PDT 2014 com/linkedin/camus/etl/kafka/common/StringOpentsdbRecordWriterProvider.class
Also, part of debug, I have some print statements in the code which is not being printed.
- Shekar
./camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java
Changed from:
try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
}
TO
try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
/*Class<RecordWriterProvider> rwp = EtlMultiOutputFormat.getRecordWriterProviderClass(context);
Constructor<RecordWriterProvider> crwp = rwp.getConstructor(TaskAttemptContext.class);
recordWriterProvider = crwp.newInstance(context);*/
}
Digging further...
try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
}
java.lang.Exception: java.lang.IllegalStateException: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.IllegalStateException: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.getDataRecordWriter(EtlMultiOutputRecordWriter.java:128)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.write(EtlMultiOutputRecordWriter.java:99)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.write(EtlMultiOutputRecordWriter.java:21)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:120)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at java.lang.Class.newInstance0(Class.java:340)
at java.lang.Class.newInstance(Class.java:308)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.getDataRecordWriter(EtlMultiOutputRecordWriter.java:121)
... 16 more
try
{
//recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();