Incremental Sum of values in Scalding Job

187 views
Skip to first unread message

Deepika Khera

unread,
Feb 27, 2015, 8:36:55 PM2/27/15
to cascadi...@googlegroups.com
Hi,

I am writing a scalding job . I have data of the form


Category1       Category2       Account       Amount    
c1                     c2                   a1                100              
c1                     c2                   a2                200               
c1                     c2                   a3                300               
c1                     c3                   a1                150               
c1                     c3                   a5                250               


I want to do a group by (Category1, Category2) and then for each group, incrementally sum the amount to get cumulative amount (and similarly cumulative count).


Category1       Category2       Account       Amount       Cumulative_Amount    
c1                     c2                   a1                100              100
c1                     c2                   a2                200               300   (100+200)
c1                     c2                   a3                300               600    (100+200+300)

c1                     c3                   a1                150               150
c1                     c3                   a5                250               400    (150+250)

Please let me know what is the best way to solve this problem.


-----------------------------------------------------------------------------------------

I have tried different ways to achieve this, but can't come up with a complete solution.


Option1:  Using map after groupBy - This approach will not work perhaps, because the "grouped.map" is not guaranteed to run on a single machine, even though it is for the same key. Is that right?

def groupByCategory1AndCategory2(pipeTypedPipe[Tuple11]): TypedPipe[Tuple13] = {
    val grouped = pipe.groupBy { dataTuple11 => (data._6data._11) }
    var sum : Double =0
    var oldKey :String =""
    var newKey :String =""
    val aggTypedPipe[((StringString), (Tuple11,Aggregates))] = grouped.map { t: ((StringString), Tuple11) => 
            if (sum==0)
              oldKey = t._2._6 + ":" +  t._2._11
            newKey  =  t._2._6 + ":" +  t._2._11 
            if (oldKey!=newKey)
            {             
              sum=0
              counter=0
              oldKey = newKey
            }  
            sum = sum + t._2._4 
            counter = counter + 1
            ((t._2._6t._2._11), (t._2,Aggregates(sumcounter)))                        
    }
    val f = agg.values.map { case (dataaggr) => (data._1data._11data._6data._2data._11data._10data._9data._3data._5data._4aggr.sumaggr.countdata._4 / aggr.sum) }
    return f;
  }
 

Option2:  Using mapGroup after groupBy - In this case the iterator comes as empty

def groupByCategory1AndCategory2(pipeTypedPipe[Tuple11]): TypedPipe[Tuple11] = {
       val grouped = pipe.groupBy { dataTuple11 =>  (data._6data._11) }
       
        val agg = grouped.mapGroup { case (kiter) => 
          print("Starting it " +iter.length + " hasnext: " + iter.hasNext + " empty: " + iter.isEmpty)


Option3: In reduce

If I were writing a map reduce job, I would just write out the incremental sum while iterating over the values in reducer . I am not sure how I can do that in Scalding. Here is similar code with reduce. I dont know how to break out of the reduce loop and write out the temporary sum.

def groupByCategory1AndCategory2(pipeTypedPipe[Tuple11]): TypedPipe[Tuple13] = {
    val grouped = pipe.groupBy { dataTuple11 => (data._6data._11) }
    val aggTypedPipe[((StringString), Aggregates)] = pipe.map { tTuple11 => ((t._6t._11), Aggregates(t._41.0)) }
      .group
      .reduce { (partialAggcurrent) =>
        val Aggregates(lsumlsize) = partialAgg
        val Aggregates(rsumrsize) = current
        Aggregates(lsum + rsumlsize + 1)
      }
    val result = grouped.join(agg).toTypedPipe
    val f = result.values.map { case (dataaggr) => (data._1data._11data._6data._2data._11data._10data._9data._3data._5data._4aggr.sumaggr.countdata._4 / aggr.sum) }
    return f;
  }


Best Regards,
Deepika



          

Sam Ritchie

unread,
Feb 27, 2015, 8:57:58 PM2/27/15
to cascadi...@googlegroups.com, cascadi...@googlegroups.com
Check out scanLeft. (Would link code, but I'm on the phone.) hope that helps!


Sent from Mailbox


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/86ace00b-300e-43f2-a1f1-76672996e28a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Oscar Boykin

unread,
Feb 27, 2015, 9:34:49 PM2/27/15
to cascadi...@googlegroups.com, Steven H. Noble
check out:



import com.twitter.scalding.typed.CumulativeSum._

then on a TypedPipe[(K, (U, V))] you can do .cumulativeSum

in your case: K == (cat1, cat2) U = account, V = amount.

thanks to Steven Noble who wrote this code!

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/86ace00b-300e-43f2-a1f1-76672996e28a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Deepika Khera

unread,
Mar 2, 2015, 2:50:13 PM3/2/15
to cascadi...@googlegroups.com, steven...@gmail.com
Thanks so much Oscar. This is so awesome. Thanks a ton to Steven Noble!

Though I understand what I should do, I am having a bit of trouble in executing this. I am probably missing something.

Here is the code I have:

 
import com.twitter.scalding.FieldConversions
import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import com.twitter.scalding.typed.CumulativeSum.CumulativeSumExtension
import com.twitter.algebird._
object MyOperations extends FieldConversions { 
def cumulativeTest(pipe : TypedPipe[(String, Double)]) = {
    val g  = pipe
    .map {
      case (gender, height) =>
        (gender, (height, 1L))
    }
    .cumulativeSum { h => (h / 100).floor.toLong }
    .map {
      case (gender, (height, rank)) =>
        (gender, height, rank)
    }
    }
  }

Though I am able to compile the import 

import com.twitter.scalding.typed.CumulativeSum.CumulativeSumExtension

But when I compile my above function : 

value cumulativeSum is not a member of com.twitter.scalding.typed.TypedPipe[(String, (Double, Long))]

[ERROR] possible cause: maybe a semicolon is missing before `value cumulativeSum'?

I'd appreciate any help.

Thanks,

Deepika

Steven H. Noble

unread,
Mar 2, 2015, 2:54:08 PM3/2/15
to Deepika Khera, cascadi...@googlegroups.com
this looks right as far as use case. have you tried:

import com.twitter.scalding.typed.CumulativeSum._

Deepika Khera

unread,
Mar 2, 2015, 3:24:26 PM3/2/15
to cascadi...@googlegroups.com, deepik...@gmail.com
My bad, works now.

Thanks a ton for writing this up Steven!

Deepika Khera

unread,
Mar 12, 2015, 1:37:22 PM3/12/15
to cascadi...@googlegroups.com, deepik...@gmail.com
Hi,

If I were to do a cumulative sum and also a count(or a regular sum) on the group by , will I be able to extend the existing code and do that in the same group by?
I am running into some issues trying to do .cumulativeSum.size   on my group by.

Thanks,
Deepika

Oscar Boykin

unread,
Mar 12, 2015, 1:55:46 PM3/12/15
to cascadi...@googlegroups.com, deepik...@gmail.com
what do you want the output of your .cumulativeSum.size to be? Because the way .size works, it would consume all the values and give you only the size. If you wanted that, you don't need the cumulativeSum part because that will not change the number of values.

If instead you are trying to attach the total count to each record, that requires two passes over the data, which you cannot do (without buffering to memory or disk) in cascading or scalding. To attach each record, you will need to do a join, which unfortunately cannot be combined with the cumulativeSum because the cumulativeSum uses sorting and currently we cannot automatically compose reductions and joins when sorting is used (this is something we are looking at improving).


For more options, visit https://groups.google.com/d/optout.

Deepika Khera

unread,
Mar 12, 2015, 4:36:15 PM3/12/15
to cascadi...@googlegroups.com, deepik...@gmail.com
Hi Oscar,

Sorry for not being clear. I was trying to achieve something like this

myPipe.groupBy(MY_KEY) { _.sum[Long]('myAmount -> 'amount).size('count)}

Except that I want cumulativeSum(and not sum) and size over the grouped by key.

That should probably be doable in the same pass?

Thanks,

Deepika

Reply all
Reply to author
Forward
0 new messages