package com.pavan.map;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndex { //main class
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>
{ //map class
Text location = new Text();
//Text mvalue = new Text();
public void map(LongWritable key, Text value, Context context )
{
FileSplit filesplit = (FileSplit) context.getInputSplit();
String filename = filesplit.getPath().getName();
location.set(filename);
// deserialize it
String line= value.toString();
//tokenize my string
StringTokenizer str = new StringTokenizer(line.toUpperCase());
while(str.hasMoreTokens())
{
value.set(str.nextToken());
try {
context.write(value, location);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text keys, Iterable<Text> values, Context context)
{
//boolean firstvalue = true;
StringBuffer valueBuilder = new StringBuffer();
//while(values.iterator() != null)
{for (Text val : values) {
valueBuilder.append(val);
valueBuilder.append(",");
}
//write the key and the adjusted value (removing the last comma)
try {
context.write(keys, new Text(valueBuilder.substring(0, valueBuilder.length() - 1)));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
valueBuilder.setLength(0);
}
}
}
public static void main(String [] args) throws IOException, InterruptedException, ClassNotFoundException
{
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}