Looking for a good way to do something like "top3" or "last3" events in a windowing function with the spark API. I was reminded this priorityqueuemonoid might help as its used for some something similar with scalding. I tried doing the following in spark, which seems to work, but I don't think I'm building in much flexibility . And looks kind of messy.
import java.util.PriorityQueue
import com.twitter.algebird.mutable.PriorityQueueMonoid
import scala.collection.JavaConverters._
val file = sc.textFile("./employeeStatus.txt")
// $ cat employeeStatus.txt
// 20150901 Jeff hired
// 20150902 Jeff late
// 20150905 Jeff absent
// 20150907 Jeff late
// 20150920 Jeff fired
// 20150909 Jeff late
// 20150831 Fred hired
// 20150904 Fred promoted
case class Employee (date:Int, name:String, action: String)
case class SortableTupleWithInt(i: (Int,Any)) extends Ordered[SortableTupleWithInt] {
def compare(that: SortableTupleWithInt) = that.i._1 - this.i._1
}
val k = 3
@transient lazy val monoider = new PriorityQueueMonoid[SortableTupleWithInt](k)
val fields = file.map {l =>
val v = l.split("\t")
val emp = Employee(v(0).toInt, v(1), v(2))
(
emp.name, (SortableTupleWithInt(emp.date, emp)))
}.map { b => (b._1,monoider.build(b._2))}
val red = fields.reduceByKey ((a, b) =>
monoider.plus(a,b))
val result = red.map { _._2}.map { _.asScala.toList }.flatMap { _.map { _.i._2}}
result.foreach(println)