mongo-default.xml for com.mongodb.hadoop.io.MongoUpdateWritable class?

152 views
Skip to first unread message

황세규

unread,
Nov 24, 2013, 9:46:27 PM11/24/13
to mongod...@googlegroups.com
Hello, I am developing map-reduce with mongodb-hadoop-driver. And these are my codes.

===Mapper
public class MapperCarrier extends Mapper<LongWritable, Text, BSONWritable, IntWritable> {
   
    private final static IntWritable outputValue = new IntWritable(1);  
    private BasicDBObjectBuilder builder = null;
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
      
        if(key.get() >0) {
             String[] columns = value.toString().split(",");
      
            if(columns != null && columns.length >0) {
                  if(!columns[15].equals("NA")) {
                    int depDelayTime = Integer.parseInt(columns[15]);
              
                    if(depDelayTime>0) {
                        BasicDBObject list = new BasicDBObject();
                        list.put("date",columns[0]+columns[1]);
                       
                        builder = BasicDBObjectBuilder.start().add("carrierCode", columns[8]).add("delayTime", new BasicDBObject("departure",list));
                                               
                        context.write(new BSONWritable(builder.get()), outputValue);
                    }
                }
....
....

=== Reducer
public class ReducerCarrier extends    Reducer<BSONWritable, IntWritable, NullWritable, MongoUpdateWritable> {
   
    @Override
    public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

        String code = (String)key.getDoc().get("carrierCode");
        BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");
        BasicDBObject query = new BasicDBObject("carrierCode", code );
               
        int sum = 0;
           
        if(keyType.keySet().contains("departure")) {
           
            query.append("delayTime", "departure");
           
            for(IntWritable value : values) {
                sum += value.get();
            }
            BSONObject time = (BSONObject)keyType.get("departure");
            String date = (String)time.get("date");
            query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
        } else if(keyType.keySet().contains("arrival")) {
           
            query.append("delayTime", "arrival");
           
            for(IntWritable value : values) {
                sum += value.get();
            }   
            BSONObject time = (BSONObject)keyType.get("arrival");
            String date = (String)time.get("date");
            query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
        }
       
        BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));
       
        context.write(null, new MongoUpdateWritable(query,update,true,false)); // throws NullPointException !!
    }
}

=== mongo-deault.xml
<?xml version="1.0"?>
<configuration>
  <property>
    <name>mongo.job.verbose</name>
    <value>true</value>
  </property>
  <property>
    <name>mongo.job.background</name>
    <value>false</value>
  </property>
  <property>
    <name>mongo.input.uri</name>
    <value><!--mongodb://127.0.0.1:27017/airCarrier.timeDelay--></value>
  </property>
  <property>
    <name>mongo.output.uri</name>
    <value><!--mongodb://127.0.0.1:27017/airCarrier.timeDelay--></value>
  </property>
  <property>
    <name>mongo.input.split.read_shard_chunks</name>
    <value>false</value>
  </property>
  <property>
    <name>mongo.input.split.create_input_splits</name>
    <value>true</value>
  </property>
  <property>
    <name>mongo.input.query</name>
    <!--<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>-->
    <value></value>
  </property>
  <property>
      <name>mongo.input.key</name>
      <value>location</value>
  </property>
  <property>
    <name>mongo.input.fields</name>
    <value></value>
  </property>
  <property>
    <name>mongo.input.sort</name>
    <value></value>
  </property>
  <property>
    <name>mongo.input.limit</name>
    <value>0</value> <!-- 0 == no limit -->
  </property>
  <property>
    <name>mongo.input.skip</name>
    <value>0</value> <!-- 0 == no skip -->
  </property>
  <property>
         <name>mapred.input.dir</name>
         <value>file:///home/user01/input_tmp</value>
  </property>
  <property>
         <name>mapred.output.dir</name>
         <value>file:///home/user01/output_tmp</value>
  </property>
  <property>
    <name>mongo.job.mapper</name>
    <value>com.aaa.mongo.MapperCarrier</value>
  </property>
  <property>
    <name>mongo.job.reducer</name>
    <value>com.aaa.mongo.ReducerCarrier</value>
  </property>
  <property>
    <name>mongo.job.input.format</name> <!-- I have no idea of this property>
    <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
    <!--<value>com.mongodb.hadoop.MongoInputFormat</value>-->
  </property>
  <property>
    <name>mongo.job.output.format</name> <!-- I have no idea of this property>
    <!--<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>-->
    <value>com.mongodb.hadoop.MongoOutputFormat</value>
  </property>
    <property>
    <name>mongo.job.mapper.output.key</name> <!-- I have no idea of this property>
    <value>com.mongodb.hadoop.io.BSONWritable</value>
  </property>
  <property>
    <name>mongo.job.mapper.output.value</name> <!-- I have no idea of this property>
    <value>org.apache.hadoop.io.IntWritable</value>
  </property>
  <property>
    <name>mongo.job.output.key</name> <!-- I have no idea of this property>
    <value>com.mongodb.hadoop.io.BSONWritable</value>
  </property>
  <property>
    <name>mongo.job.output.value</name> <!-- I have no idea of this property>
    <value>org.apache.hadoop.io.IntWritable</value>
  </property>
  <property>
    <name>mongo.job.combiner</name>
    <value>com.aaa.mongo.ReducerCarrier</value>
  </property>
  <property>
    <name>mongo.job.partitioner</name>
    <value></value>
  </property>
  <property>
    <name>mongo.job.sort_comparator</name>
    <value></value>
  </property>
  <property>
    <name>mongo.input.split_size</name>
    <value>8</value>
  </property>

</configuration>

In reducer context.write(null, new MongoUpdateWritable(query,update,true,false)); throws NullPointException !!

Kindly inform me of your advice. Thanks!!

mpobrien

unread,
Nov 25, 2013, 8:13:12 PM11/25/13
to mongod...@googlegroups.com
Hi,
Can you please attach the full log output?
In particular, the stack trace of the null pointer exception.

황세규

unread,
Nov 25, 2013, 8:40:47 PM11/25/13
to mongod...@googlegroups.com
Thanks for your immediate response! I changed the reduce code like below ,


public class ReducerCarrier extends Reducer<BSONWritable, IntWritable, NullWritable, MongoUpdateWritable> {
   
    @Override
    public void reduce(BSONWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

        String code = (String)key.getDoc().get("carrierCode");
        BSONObject keyType = (BSONObject)key.getDoc().get("delayTime");
       
        BasicDBObject query = new BasicDBObject("carrierCode", code );
               
        int sum = 0;
           
        if(keyType.keySet().contains("departure")) {
           
            query.append("delayTime", "departure");
           
            for(IntWritable value : values) {
                sum += value.get();
            }
            BSONObject time = (BSONObject)keyType.get("departure");
            String date = (String)time.get("date");
            query.append("delayTime", new BasicDBObject("departure",new BasicDBObject("date", date)));
        } else if(keyType.keySet().contains("arrival")) {
           
            query.append("delayTime", "arrival");
           
            for(IntWritable value : values) {
                sum += value.get();
            }   
            BSONObject time = (BSONObject)keyType.get("arrival");
            String date = (String)time.get("date");
            query.append("delayTime", new BasicDBObject("arrival",new BasicDBObject("date", date)));
        }
       
        BasicDBObject update = new BasicDBObject("$addToSet", new BasicDBObject("times",new Integer(sum)));
       
        NullWritable nullKey = NullWritable.get();
        MongoUpdateWritable muw = new MongoUpdateWritable(query,update,true,false);
               
        context.write(nullKey, muw); // This line throws exception!
    }
}

This is full log including exception parts.

13/11/26 10:34:27 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Spill failed
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1029)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at com.aaa.mongo.MapperCarrier.map(MapperCarrier.java:52)
    at com.aaa.mongo.MapperCarrier.map(MapperCarrier.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
Caused by: java.io.IOException: wrong key class: class org.apache.hadoop.io.NullWritable is not class com.mongodb.hadoop.io.BSONWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:60)
    at com.aaa.mongo.ReducerCarrier.reduce(ReducerCarrier.java:1)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:853)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1344)
13/11/26 10:34:27 INFO mapred.JobClient: Job complete: job_local_0001

Message has been deleted

황세규

unread,
Nov 26, 2013, 7:58:40 PM11/26/13
to mongod...@googlegroups.com
I changed reducer output key and value to the same type of reducer input key and value like below,

public class ReducerCarrier extends Reducer<BSONWritable, IntWritable,BSONWritable, IntWritable> {
...

Then my map-reducer codes worked without any exceptions. But definitely output values were not what I want.
How can I use MongoUpdateWritable class as reducer output value? Or do we have to make JIRA about this issue?

I need your help. Thanks

Siyuan Zhou

unread,
Dec 30, 2013, 6:58:40 PM12/30/13
to mongod...@googlegroups.com
In the last error message you posted, Hadoop complains about the mismatch between the type of the output key your reducer writes and the type you declared in the configuration.

Your code looks correct. The reducer writes (NullWritable, MongoUpdateWritable) pairs, which is what you want. But in the XML you posted before, (BSONWritable, IntWritable) is expected.

  <property>
    <name>mongo.job.output.key</name> <!-- I have no idea of this property>
    <value>com.mongodb.hadoop.io.BSONWritable</value>
  </property>
  <property>
    <name>mongo.job.output.value</name> <!-- I have no idea of this property>
    <value>org.apache.hadoop.io.IntWritable</value>
  </property>

So change the values in xml configuration to NullWritable and MongoUpdateWritable to make the types consistent, like this:

  <property>
    <name>mongo.job.output.key</name>
    <value> org.apache.hadoop.io.NullWritable</value>

  </property>
  <property>
    <name>mongo.job.output.value</name>
    <value> com.mongodb.hadoop.io.MongoUpdateWritable</value>
  </property>

I believe you have read this Hadoop tutorial, which explains the intermediate key-value pairs and output key-value pairs very well.

Thanks,
Siyuan
Reply all
Reply to author
Forward
0 new messages