Need Help Writing A Combiner

20 views
Skip to first unread message

Raghu

unread,
Apr 18, 2014, 4:50:59 PM4/18/14
to scoobi...@googlegroups.com
Hi,

I have code which does group by on DList[AvroObject]. Keys are some of the fields from this object. Now in the reducer i am summing up remaining fields of this object. How do I write a combiner for this? Can anyone please help me with it. Below is the grouping code.

val summary = DList[AvroObject] groupBy { row => (row.getField1, row.getField2, row.getField3) } map {
 case (key, values) =>
   var sum1 = 0
   var sum2 = 0
   var count = 0

   values foreach {
     row =>
        sum1 = sum1 + row.getField4
        sum2 = sum2 + row.getField5
        count = count + 1
   }
   //And then I create a summary object after this.
}

Thanks,
Raghu

Raghu

unread,
Apr 21, 2014, 2:20:16 PM4/21/14
to scoobi...@googlegroups.com
Hi,

This is what I have written. But, I don't see this reduction being used when I run my code. Mappers input and output records are same. Please let me know what's wrong. Thanks in advance.

val red: Reduction[AvroObject] = Reduction((row1: AvroObject, row2: AvroObject) => {
     
      val field4Sum = row1.getField4+ row2.getField4
      val field5Sum = row1.getField5+ row2.getField5
      //val count = 2
      row1.setField4(field4Sum )
      row1.setField5(field5Sum )

      row1
    })

val summary = DList[AvroObject] groupBy { row => (row.getField1, row.getField2, row.getField3) }.combine(red)

Thanks,
Raghu

Raghu

unread,
May 13, 2014, 2:46:12 PM5/13/14
to scoobi...@googlegroups.com
Here is the solution. For counts add a column to the AvroObject and set it to 1 before applying the reduction.

def red: Reduction[AvroObject] = Reduction((row1: AvroObject, row2: AvroObject) => {

     
      val field4Sum = row1.getField4+ row2.getField4
      val field5Sum = row1.getField5+ row2.getField5
      val count = row1.getCountField + row2.getCountField
      row1.setField4(field4Sum )
      row1.setField5(field5Sum )
      row1.setCountField(count)


      row1
    })

val summary = DList[AvroObject] groupBy { row => (row.getField1, row.getField2, row.getField3) }.combine(red)

Thanks,
Raghu

Reply all
Reply to author
Forward
0 new messages