I am getting the following ClassCastException with a very simple TupleMapper that reads Avro files:
java.lang.ClassCastException: com.datasalt.pangool.io.DatumWrapper cannot be cast to org.apache.avro.mapred.AvroWrapper
at com.datasalt.pangool.tuplemr.avro.AvroOutputFormat$1.write(AvroOutputFormat.java:141)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:531)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.datasalt.pangool.tuplemr.TupleMapper$Collector.write(TupleMapper.java:133)
at org.mitre.ttv.mapred.IdentityMapper.map(IdentityMapper.java:63)
at org.mitre.ttv.mapred.IdentityMapper.map(IdentityMapper.java:13)
at com.datasalt.pangool.tuplemr.TupleMapper.map(TupleMapper.java:104)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at com.datasalt.pangool.tuplemr.mapred.lib.input.DelegatingMapper.run(DelegatingMapper.java:50)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:648)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
The TupleMapper class is included below. Very similar TupleMapper code has worked for many months so I am not sure what I am doing wrong.
Here is the list of Pangool and Avro jar files I pulling in from maven:
292816 Wed May 15 12:52:54 EDT 2013 lib/avro-1.6.3.jar
166555 Wed May 15 12:52:54 EDT 2013 lib/avro-mapred-1.7.4.jar
187840 Wed May 15 12:52:54 EDT 2013 lib/avro-ipc-1.7.4.jar
307446 Wed May 15 12:52:54 EDT 2013 lib/pangool-core-0.60.3.jar
Any ideas. Thanks.
Curt-
package org.mitre.ttv.mapred;
import java.io.IOException;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.io.NullWritable;
import com.datasalt.pangool.io.Schema;
import com.datasalt.pangool.io.Tuple;
import com.datasalt.pangool.tuplemr.TupleMapper;
@SuppressWarnings("serial")
public class IdentityMapper extends TupleMapper<AvroWrapper<Record>, NullWritable > {
private Tuple tuple = null;
private String sourceID;
public IdentityMapper(String source_id, Schema schema) {
sourceID = source_id;
tuple = new Tuple(schema);
}
@Override
public void map(AvroWrapper<Record> key, NullWritable value, TupleMRContext context, Collector collector)
throws IOException, InterruptedException
{
Record input_record = key.datum();
if( TrackUtil.utf8ToString(input_record.get("track_id")).equals("20121127028628") ) {
context.getHadoopContext().getCounter("Map PointType", input_record.get("point_type").toString()).increment(1);
String point_type = TrackUtil.utf8ToString(input_record.get("point_type"));
tuple.set("point_type", point_type);
tuple.set("track_id", TrackUtil.utf8ToString(input_record.get("track_id")));
tuple.set("matching_track_id", TrackUtil.utf8ToString(input_record.get("matching_track_id")));
tuple.set("utc_date_time", input_record.get("utc_date_time"));
tuple.set("latitude", input_record.get("pt_lat"));
tuple.set("longitude", input_record.get("pt_lon"));
tuple.set("altitude", input_record.get("pt_alt"));
tuple.set("eq_time_dist", input_record.get("match_eq_time_dist"));
tuple.set("eq_time_time", input_record.get("match_eq_time_time"));
tuple.set("eq_time_alt", input_record.get("match_eq_time_alt"));
tuple.set("min_dist_dist", input_record.get("match_min_dist"));
tuple.set("min_dist_time", input_record.get("match_min_dist_time"));
tuple.set("min_dist_alt", input_record.get("match_min_dist_alt"));
tuple.set("xtd_dist", input_record.get("xtd_dist"));
tuple.set("xtd_time", input_record.get("xtd_time"));
tuple.set("xtd_alt", input_record.get("xtd_alt"));
tuple.set("acid", TrackUtil.utf8ToString(input_record.get("acid")));
tuple.set("deptAirport", TrackUtil.utf8ToString(input_record.get("dept_apt")));
tuple.set("arrAirport", TrackUtil.utf8ToString(input_record.get("arr_apt")));
tuple.set("adsb_iapNacp", input_record.get("adsb_nacp"));
tuple.set("adsb_ltiLinkVer", input_record.get("adsb_link"));
tuple.set("adsb_paAltitude", input_record.get("adsb_paAltitude"));
tuple.set("adsb_gaGeoAlt", input_record.get("adsb_gaGeoAlt"));
tuple.set("TT_synthetic_altitude", input_record.get("TT_synthetic_altitude"));
tuple.set("TT_modeCalt", input_record.get("TT_modeCalt"));
tuple.set("TT_geometricAlt", input_record.get("TT_geometricAlt"));
tuple.set("activeSensors", TrackUtil.utf8ToString(input_record.get("active_sensors")));
collector.write(tuple);
}
}
}