etl.record.writer.provider.class not being referenced

44 views
Skip to first unread message

cti...@gmail.com

unread,
Jul 21, 2014, 4:59:02 PM7/21/14
to camu...@googlegroups.com
Hello,

I wanted to change the logic on the writer so that it writes to other destinations (like opentsdb or send a snmp trap).

I made a change to point to a different code
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringOpentsdbRecordWriterProvider

However, I dont see that this is being called atall.

It seem to work with

https://github.com/Produban/camus

but not with

https://github.com/linkedin/camus

Any pointers?

- Shekar

cti...@gmail.com

unread,
Jul 21, 2014, 5:05:16 PM7/21/14
to camu...@googlegroups.com, cti...@gmail.com
I am running camus ..

java -cp camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar:/etc/hadoop/conf:/usr/lib/hadoop-hdfs/hadoop-hdfs.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.1.txt

I see that the class is referenced in the jar

[ctippur@pppdc9prd2yu kafka-camus-monitoring]$ jar tvf camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar|grep -i opentsdb
8514 Mon Jul 21 13:37:52 PDT 2014 com/linkedin/camus/etl/kafka/common/StringOpentsdbRecordWriterProvider$1.class
4322 Mon Jul 21 13:37:52 PDT 2014 com/linkedin/camus/etl/kafka/common/StringOpentsdbRecordWriterProvider.class

Also, part of debug, I have some print statements in the code which is not being printed.

- Shekar

cti...@gmail.com

unread,
Jul 21, 2014, 5:30:05 PM7/21/14
to camu...@googlegroups.com, cti...@gmail.com
I see that

./camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java

Changed from:

try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
}


TO

try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
/*Class<RecordWriterProvider> rwp = EtlMultiOutputFormat.getRecordWriterProviderClass(context);
Constructor<RecordWriterProvider> crwp = rwp.getConstructor(TaskAttemptContext.class);
recordWriterProvider = crwp.newInstance(context);*/
}

Digging further...

cti...@gmail.com

unread,
Jul 21, 2014, 6:04:29 PM7/21/14
to camu...@googlegroups.com, cti...@gmail.com
Here is the exception when I change it back to

try
{
recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();
}

java.lang.Exception: java.lang.IllegalStateException: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.lang.IllegalStateException: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.getDataRecordWriter(EtlMultiOutputRecordWriter.java:128)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.write(EtlMultiOutputRecordWriter.java:99)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.write(EtlMultiOutputRecordWriter.java:21)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:120)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:268)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.InstantiationException: com.linkedin.camus.etl.kafka.common.SequenceFileRecordWriterProvider
at java.lang.Class.newInstance0(Class.java:340)
at java.lang.Class.newInstance(Class.java:308)
at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputRecordWriter.getDataRecordWriter(EtlMultiOutputRecordWriter.java:121)
... 16 more

cti...@gmail.com

unread,
Jul 21, 2014, 6:14:00 PM7/21/14
to camu...@googlegroups.com, cti...@gmail.com
Please read the comment as

try
{
//recordWriterProvider = EtlMultiOutputFormat.getRecordWriterProviderClass(context).newInstance();

Reply all
Reply to author
Forward
0 new messages