I have a scenario where I would like to POS tag documents and would need to run the annotation on hadoop. I started with Twitter data and would generalize it later. Basically the Mapper get lines (which correspond to tweets in JSON format) from the input file and once 100 tweets are accumulated, a "process" method is invoked which runs tokenizer and POS tagger over the collection. This MapReduce code seems to run correctly when NOT run on hadoop, but when I try to run it on hadoop, I get the following Exception at the line, "tc.annotator.process(jCas)":
public class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
HashMap<String, TweetConcat> map = new HashMap<String, TweetConcat>();
JSONParser parser=new JSONParser();
TweetConcat tc;
public static void process(TweetConcat tc) throws UIMAException {
//AnalysisEngineDescription cc = createEngineDescription(CasDumpWriter.class,CasDumpWriter.PARAM_OUTPUT_FILE, "/home/sabanerjee/Twitter/annotations2/"+tc.language+"/"+tc.document_id+"output.txt");
JCas jCas = JCasFactory.createJCas();
jCas.setDocumentLanguage(tc.language);
jCas.setDocumentText(tc.sb.toString());
DocumentMetaData metaData = new DocumentMetaData(jCas);
metaData.setDocumentId(tc.document_id);
metaData.addToIndexes();
SimplePipeline.runPipeline(jCas, tc.tokenizer);
tc.annotator.process(jCas);
//SimplePipeline.runPipeline(jCas, cc);
SimplePipeline.runPipeline(jCas, tc.serializer);
}
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
//System.out.println("[MAPPER]"+line+"------"+count);
try {
JSONObject tweet = (JSONObject)parser.parse(line);
String lang = (String) tweet.get("lang");
if(lang.equals("en") || lang.equals("de") || lang.equals("ar"))
{
//System.out.println(lang);
tc = map.get(lang);
if(tc==null) {
tc = new TweetConcat(lang);
tc.document_id = String.valueOf(tweet.get("id"));
map.put(lang, tc);
}
else if(tc.tweetcount==0) {
tc.document_id = String.valueOf(tweet.get("id"));
}
tc.sb.append(tweet.get("text")+"\n");
tc.tweetcount++;
if(tc.tweetcount>100) {
System.out.println("Started: "+tc.language+" "+tc.document_id);
try {
process(tc);
}
catch(Exception e) {
e.printStackTrace();
}
tc.tweetcount=0;
tc.sb.setLength(0);
System.out.println("Done: "+tc.language+" "+tc.document_id);
}
}
}
Thanks and Regards,
Samudra
--
Samudra Banerjee
First year graduate student,
Department of Computer Science,
State University of New York at Stony Brook
NY 11794-3393