InstantiationException on custom RecordWriterProvider

19 views
Skip to first unread message

Rohan Aletty

unread,
Aug 5, 2015, 1:29:35 PM8/5/15
to Camus - Kafka ETL for Hadoop
Hello,

I'm building an ETL pipeline and need to use Camus pull data from Kafka to HDFS in parquet-avro. In my camus.properties file, I have defined "etl.record.writer.provider.class=com.placeiq.enterprise.pvr.camus.ParquetRecordWriterProvider" and have a custom decoder and schema registry as well. 

However, when I attempt to use Camus to pull from my topic, I receive the following exception:

java.lang.IllegalStateException: java.lang.InstantiationException: com.placeiq.enterprise.pvr.camus.ParquetRecordWriterProvider
	at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputCommitter.<init>(EtlMultiOutputCommitter.java:62)
	at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat.getOutputCommitter(EtlMultiOutputFormat.java:73)
	at org.apache.hadoop.mapred.Task.initialize(Task.java:523)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:313)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
	at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.InstantiationException: com.placeiq.enterprise.pvr.camus.ParquetRecordWriterProvider
	at java.lang.Class.newInstance(Class.java:359)
	at com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputCommitter.<init>(EtlMultiOutputCommitter.java:60)
	... 8 more

Anyone have any suggestions? I can provide more information if needed.

I saw a similar thread with no response/update here: https://groups.google.com/forum/#!searchin/camus_etl/InstantiationException/camus_etl/oRJTqXs0ZrY/kxHxtMLpW14J

Thanks,
Rohan

Issac Buenrostro

unread,
Aug 5, 2015, 1:38:38 PM8/5/15
to Rohan Aletty, Camus - Kafka ETL for Hadoop
Hi Rohan,

Instantiation Exception means that the class cannot be instantiated, and it is usually a problem with your class. See http://docs.oracle.com/javase/7/docs/api/java/lang/InstantiationException.html for some reasons why you may be getting that error.
If you can include the class or an excerpt of your class (especially the class signature and constructor) we can probably give you more information.

Which sha or branch of Camus are you using? The line numbers in your stacktrace do not match the line numbers in the current master of Camus.

Best,
Issac

--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Rohan Aletty

unread,
Aug 5, 2015, 1:50:09 PM8/5/15
to Issac Buenrostro, Camus - Kafka ETL for Hadoop
Hi Issac,

Thanks for the response. My RecordWriterProvider is written in scala:

class ParquetRecordWriterProvider(context: TaskAttemptContext) extends RecordWriterProvider {
  lazy val blockSize = 128 * 1024 * 1024
  lazy val pageSize = 64 * 1024
  protected var ext = ".parquet"
  protected var codec = CompressionCodecName.UNCOMPRESSED

  if (FileOutputFormat.getCompressOutput(context)) {
    val compression = EtlMultiOutputFormat.getEtlOutputCodec(context)
    if ("snappy".equals(compression)) {
      codec = CompressionCodecName.SNAPPY
      ext = ext.concat(".snappy")
    } else if ("gzip".equals(compression)) {
      codec = CompressionCodecName.GZIP
      ext = ext.concat(".gz")
    }
  }
 
  override def getFilenameExtension() : String = ext

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getDataRecordWriter(
    context: TaskAttemptContext,
    fileName: String,
    camusWrapper: CamusWrapper[_],
    committer: FileOutputCommitter
  ) : RecordWriter[IEtlKey, CamusWrapper[_]] = {
    var path = committer.getWorkPath
    path = new Path(path, FileOutputFormat.getUniqueFile(context, fileName, ext))

    val writer = new AvroParquetWriter[SpecificRecord](path, LogEntry.getClassSchema, codec, blockSize, pageSize)

    new RecordWriter[IEtlKey, CamusWrapper[_]] {
      @throws(classOf[IOException])
      override def write(key: IEtlKey, data: CamusWrapper[_]) = writer.write(data.getRecord.asInstanceOf[LogEntry])

      @throws(classOf[IOException])
      @throws(classOf[InterruptedException])
      override def close(context: TaskAttemptContext) = writer.close()
    }
  }

}

In terms of Camus version, my pom is pulling 0.1.0-SNAPSHOT which looks like the version on master. Not sure why the source code isn't matching up... will look into that.

Thanks,
Rohan
--

Rohan Aletty  |  PlaceIQ  | Software Engineer  |  ro...@placeiq.com  |  (908) 963-8566

Issac Buenrostro

unread,
Aug 5, 2015, 2:24:08 PM8/5/15
to Rohan Aletty, Camus - Kafka ETL for Hadoop
It looks like java reflection might not like scala constructors. There's a discussion around a related issue here: http://stackoverflow.com/questions/17401565/instantiating-a-scala-class-using-reflection-javas-newinstance

Can you try explicitly implementing a public constructor for the scala class? 
Reply all
Reply to author
Forward
0 new messages