def groupByCategory1AndCategory2(pipe: TypedPipe[Tuple11]): TypedPipe[Tuple13] = {
val grouped = pipe.groupBy { data: Tuple11 => (data._6, data._11) }
var sum : Double =0
var oldKey :String =""
var newKey :String =""
val agg: TypedPipe[((String, String), (Tuple11,Aggregates))] = grouped.map { t: ((String, String), Tuple11) =>
if (sum==0)
oldKey = t._2._6 + ":" + t._2._11
newKey = t._2._6 + ":" + t._2._11
if (oldKey!=newKey)
{
sum=0
counter=0
oldKey = newKey
}
sum = sum + t._2._4
counter = counter + 1
((t._2._6, t._2._11), (t._2,Aggregates(sum, counter)))
}
val f = agg.values.map { case (data, aggr) => (data._1, data._11, data._6, data._2, data._11, data._10, data._9, data._3, data._5, data._4, aggr.sum, aggr.count, data._4 / aggr.sum) }
return f;
}
Option2: Using mapGroup after groupBy - In this case the iterator comes as empty
def groupByCategory1AndCategory2(pipe: TypedPipe[Tuple11]): TypedPipe[Tuple11] = {
val grouped = pipe.groupBy { data: Tuple11 => (data._6, data._11) }
val agg = grouped.mapGroup { case (k, iter) =>
print("Starting it " +iter.length + " hasnext: " + iter.hasNext + " empty: " + iter.isEmpty)
}
def groupByCategory1AndCategory2(pipe: TypedPipe[Tuple11]): TypedPipe[Tuple13] = {
val grouped = pipe.groupBy { data: Tuple11 => (data._6, data._11) }
val agg: TypedPipe[((String, String), Aggregates)] = pipe.map { t: Tuple11 => ((t._6, t._11), Aggregates(t._4, 1.0)) }
.group
.reduce { (partialAgg, current) =>
val Aggregates(lsum, lsize) = partialAgg
val Aggregates(rsum, rsize) = current
Aggregates(lsum + rsum, lsize + 1)
}
val result = grouped.join(agg).toTypedPipe
val f = result.values.map { case (data, aggr) => (data._1, data._11, data._6, data._2, data._11, data._10, data._9, data._3, data._5, data._4, aggr.sum, aggr.count, data._4 / aggr.sum) }
return f;
}
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/86ace00b-300e-43f2-a1f1-76672996e28a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/86ace00b-300e-43f2-a1f1-76672996e28a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
import com.twitter.scalding.FieldConversions
import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import com.twitter.scalding.typed.CumulativeSum.CumulativeSumExtension
import com.twitter.algebird._
object MyOperations extends FieldConversions {
def cumulativeTest(pipe : TypedPipe[(String, Double)]) = {
val g = pipe
.map {
case (gender, height) =>
(gender, (height, 1L))
}
.cumulativeSum { h => (h / 100).floor.toLong }
.map {
case (gender, (height, rank)) =>
(gender, height, rank)
}
}
}
Though I am able to compile the import
import com.twitter.scalding.typed.CumulativeSum.CumulativeSumExtension
But when I compile my above function :
value cumulativeSum is not a member of com.twitter.scalding.typed.TypedPipe[(String, (Double, Long))]
[ERROR] possible cause: maybe a semicolon is missing before `value cumulativeSum'?
I'd appreciate any help.
Thanks,
Deepika
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/0c74b999-d950-4e7d-92a2-f31d266a168d%40googlegroups.com.
myPipe.groupBy(MY_KEY) { _.sum[Long]('myAmount -> 'amount).size('count)}
Except that I want cumulativeSum(and not sum) and size over the grouped by key.
That should probably be doable in the same pass?
Thanks,
Deepika