Hi Issac,
Thanks for the response. My RecordWriterProvider is written in scala:
class ParquetRecordWriterProvider(context: TaskAttemptContext) extends RecordWriterProvider {
lazy val blockSize = 128 * 1024 * 1024
lazy val pageSize = 64 * 1024
protected var ext = ".parquet"
protected var codec = CompressionCodecName.UNCOMPRESSED
if (FileOutputFormat.getCompressOutput(context)) {
val compression = EtlMultiOutputFormat.getEtlOutputCodec(context)
if ("snappy".equals(compression)) {
codec = CompressionCodecName.SNAPPY
ext = ext.concat(".snappy")
} else if ("gzip".equals(compression)) {
codec = CompressionCodecName.GZIP
ext = ext.concat(".gz")
}
}
override def getFilenameExtension() : String = ext
@throws(classOf[IOException])
@throws(classOf[InterruptedException])
override def getDataRecordWriter(
context: TaskAttemptContext,
fileName: String,
camusWrapper: CamusWrapper[_],
committer: FileOutputCommitter
) : RecordWriter[IEtlKey, CamusWrapper[_]] = {
var path = committer.getWorkPath
path = new Path(path, FileOutputFormat.getUniqueFile(context, fileName, ext))
val writer = new AvroParquetWriter[SpecificRecord](path, LogEntry.getClassSchema, codec, blockSize, pageSize)
new RecordWriter[IEtlKey, CamusWrapper[_]] {
@throws(classOf[IOException])
override def write(key: IEtlKey, data: CamusWrapper[_]) = writer.write(data.getRecord.asInstanceOf[LogEntry])
@throws(classOf[IOException])
@throws(classOf[InterruptedException])
override def close(context: TaskAttemptContext) = writer.close()
}
}
}
In terms of Camus version, my pom is pulling 0.1.0-SNAPSHOT which looks like the version on master. Not sure why the source code isn't matching up... will look into that.
Thanks,
Rohan