Using priority queue to do something like top N in a window

40 views
Skip to first unread message

Jeff Tarn

unread,
Sep 29, 2015, 3:41:55 AM9/29/15
to algebird
Looking for a good way to do something like "top3" or "last3" events in a windowing function with the spark API.  I was reminded this priorityqueuemonoid might help as its used for some something similar with scalding.  I tried doing the following in spark, which seems to work, but I don't think I'm building in much flexibility .  And looks kind of messy.


import java.util.PriorityQueue
import com.twitter.algebird.mutable.PriorityQueueMonoid
import scala.collection.JavaConverters._

val file  = sc.textFile("./employeeStatus.txt")
// $ cat employeeStatus.txt
// 20150901    Jeff    hired
// 20150902    Jeff    late
// 20150905    Jeff    absent
// 20150907    Jeff    late
// 20150920    Jeff    fired
// 20150909    Jeff    late
// 20150831    Fred    hired
// 20150904    Fred    promoted

case class Employee (date:Int, name:String, action: String)

case class SortableTupleWithInt(i: (Int,Any)) extends Ordered[SortableTupleWithInt] {
  def compare(that: SortableTupleWithInt) = that.i._1 - this.i._1 
}

val k = 3

@transient lazy val monoider = new PriorityQueueMonoid[SortableTupleWithInt](k)

val fields = file.map {l =>
    val v =  l.split("\t")
    val emp = Employee(v(0).toInt, v(1), v(2))
    (emp.name, (SortableTupleWithInt(emp.date, emp)))
    }.map { b => (b._1,monoider.build(b._2))}


val red = fields.reduceByKey ((a, b) =>
    monoider.plus(a,b))

val result = red.map { _._2}.map { _.asScala.toList }.flatMap { _.map { _.i._2}}

result.foreach(println)

Reply all
Reply to author
Forward
0 new messages