Hello, I am developing map-reduce with mongodb-hadoop-driver. And these are my codes.
===Mapper
public class MapperCarrier extends Mapper<LongWritable, Text,
BSONWritable, IntWritable> {
private final static IntWritable outputValue = new IntWritable(1);
private BasicDBObjectBuilder builder = null;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
if(key.get() >0) {
String[] columns = value.toString().split(",");
if(columns != null && columns.length >0) {
if(!columns[15].equals("NA")) {
int depDelayTime = Integer.parseInt(columns[15]);
if(depDelayTime>0) {
BasicDBObject list = new BasicDBObject();
list.put("date",columns[0]+columns[1]);
builder = BasicDBObjectBuilder.start().add("carrierCode", columns[8]).add("delayTime", new BasicDBObject("departure",list));
context.write(new BSONWritable(builder.get()), outputValue);
}
}
....
....
=== Reducer
public class ReducerCarrier extends Reducer<BSONWritable, IntWritable,
NullWritable, MongoUpdateWritable> {
@Override
public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
String code = (String)key.getDoc().get("carrierCode");
BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");
BasicDBObject query = new BasicDBObject("carrierCode", code );
int sum = 0;
if(keyType.keySet().contains("departure")) {
query.append("delayTime", "departure");
for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("departure");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
} else if(keyType.keySet().contains("arrival")) {
query.append("delayTime", "arrival");
for(IntWritable value : values) {
sum += value.get();
}
BSONObject time = (BSONObject)keyType.get("arrival");
String date = (String)time.get("date");
query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
}
BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));
context.write(null, new MongoUpdateWritable(query,update,true,false));
// throws NullPointException !! }
}
=== mongo-deault.xml
<?xml version="1.0"?>
<configuration>
<property>
<name>mongo.job.verbose</name>
<value>true</value>
</property>
<property>
<name>mongo.job.background</name>
<value>false</value>
</property>
<property>
<name>mongo.input.uri</name>
<value><!--mongodb://
127.0.0.1:27017/airCarrier.timeDelay--></value>
</property>
<property>
<name>mongo.output.uri</name>
<value><!--mongodb://
127.0.0.1:27017/airCarrier.timeDelay--></value>
</property>
<property>
<name>mongo.input.split.read_shard_chunks</name>
<value>false</value>
</property>
<property>
<name>mongo.input.split.create_input_splits</name>
<value>true</value>
</property>
<property>
<name>mongo.input.query</name>
<!--<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>-->
<value></value>
</property>
<property>
<name>mongo.input.key</name>
<value>location</value>
</property>
<property>
<name>mongo.input.fields</name>
<value></value>
</property>
<property>
<name>mongo.input.sort</name>
<value></value>
</property>
<property>
<name>mongo.input.limit</name>
<value>0</value> <!-- 0 == no limit -->
</property>
<property>
<name>mongo.input.skip</name>
<value>0</value> <!-- 0 == no skip -->
</property>
<property>
<name>mapred.input.dir</name>
<value>file:///home/user01/input_tmp</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>file:///home/user01/output_tmp</value>
</property>
<property>
<name>mongo.job.mapper</name>
<value>com.aaa.mongo.MapperCarrier</value>
</property>
<property>
<name>mongo.job.reducer</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.input.format</name>
<!-- I have no idea of this property> <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
<!--<value>com.mongodb.hadoop.MongoInputFormat</value>-->
</property>
<property>
<name>mongo.job.output.format</name>
<!-- I have no idea of this property> <!--<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>-->
<value>com.mongodb.hadoop.MongoOutputFormat</value>
</property>
<property>
<name>mongo.job.mapper.output.key</name>
<!-- I have no idea of this property> <value>com.mongodb.hadoop.io.BSONWritable</value>
</property>
<property>
<name>mongo.job.mapper.output.value</name>
<!-- I have no idea of this property> <value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mongo.job.output.key</name>
<!-- I have no idea of this property> <value>com.mongodb.hadoop.io.BSONWritable</value>
</property>
<property>
<name>mongo.job.output.value</name>
<!-- I have no idea of this property> <value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mongo.job.combiner</name>
<value>com.aaa.mongo.ReducerCarrier</value>
</property>
<property>
<name>mongo.job.partitioner</name>
<value></value>
</property>
<property>
<name>mongo.job.sort_comparator</name>
<value></value>
</property>
<property>
<name>mongo.input.split_size</name>
<value>8</value>
</property>
</configuration>
In reducer context.write(null, new MongoUpdateWritable(query,update,true,false)); throws NullPointException !!
Kindly inform me of your advice. Thanks!!