I started something like this
1) Convert data from Text to RCFile format
Driver
public Job constructJob(final String jobName, final Path inputPath, final Path outputPath, Class<?> jarClass,
String compressionScheme, final Configuration configuration, final long colCount) throws IOException {
if (!"none".equalsIgnoreCase(compressionScheme)) {
configuration.set(RCFileOutputFormat.COMPRESSION_CODEC_CONF, compressionScheme);
}
RCFileOutputFormat.setColumnNumber(configuration, (int) colCount);
// configuration.set("mapred.child.java.opts", "-Xmx4096m");
Job job = new Job(configuration, jobName);
job.setJarByClass(jarClass);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(RCFileOutputFormat.class);
TextInputFormat.addInputPath(job, inputPath);
RCFileOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(RCFileWriteMapper.class);
return job;
}
There are no reducers here.
RCFileWriteMapper
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Void, BytesRefArrayWritable>.Context context) throws java.io.IOException,
InterruptedException {
String row = value.toString();
StringTokenizer tokenize = new StringTokenizer(row);
int columnIndex = 0;
BytesRefArrayWritable dataWrite = new BytesRefArrayWritable(10);
while (tokenize.hasMoreElements()) {
Text columnValue = new Text(tokenize.nextToken());
BytesRefWritable bytesRefWritable = new BytesRefWritable();
bytesRefWritable.set(columnValue.getBytes(), 0, columnValue.getLength());
// ensure the if required the capacity is increased
dataWrite.resetValid(columnIndex);
dataWrite.set(columnIndex, bytesRefWritable);
columnIndex++;
}
context.write(null, dataWrite);
}
The above code did work correctly But Is this how RCFileOutputFormat should be used ? Are we expected to break a given row in map() into columns, store them using BytesRefWritable as this is what RCFile expects ? Or can it be avoided and handled automatically by RCFileOutputFormat ?
II) Read specific columns of RCFile and convert them into Text
Driver:
Job job = new Job(configuration, jobName);
job.setJarByClass(jarClass);
job.setNumReduceTasks(0);
job.setInputFormatClass(RCFileBaseInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
ColumnProjectionUtils.setReadColumnIDs(configuration, columnIds);
RCFileInputFormat.addInputPath(new JobConf(configuration), inputPath);
TextOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(RCFileReadMapper.class);
return job;
I had to use org.apache.hadoop.hive.ql.io.RCFileInputFormat as RCFileBaseInputFormat (elephant-bird) did not have addInputPath() Will this work ?
Mapper:
I do not know what are the datatypes of input and output here ? I tried something like this
public class RCFileReadMapper extends Mapper<Void, BytesRefArrayWritable, Void, Text> {
protected void map(Void key, BytesRefArrayWritable value,
Mapper<Void, BytesRefArrayWritable, Void, Text>.Context context) throws java.io.IOException,
InterruptedException {
System.out.println(key);
System.out.println(value);
}
}
When running the program with above driver and mapper, the mapper was never invoked and i got the following exception
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1021)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1041)
at org.apache.hadoop.mapred.JobClient.access$700(JobClient.java:179)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:959)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:912)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:912)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530)
at com.benchmark.columnar.rcfile.direct.mr.BenchmarkRCFileMR.performRead(BenchmarkRCFileMR.java:139)
at com.benchmark.columnar.rcfile.direct.mr.BenchmarkRCFileMR.doReadBenchmark(BenchmarkRCFileMR.java:93)
at com.benchmark.columnar.rcfile.direct.mr.BenchmarkRCFileMR.main(BenchmarkRCFileMR.java:54)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:532)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
... 14 more
Regards,
Deepak