Hey guys,
I've been running into this problem trying to use the top method (def top(num: Int)(implicit ord: Ordering[T]): Array[T] from org.apache.spark.rdd.RDD.scala). I'm not sure if this problem has been resolved, or if there is a preferred workaround to using it:
> val graph = GraphLoader.edgeListFile(sc, "somegraph.bsv", canonicalOrientation = false, minEdgePartitions = 2).cache()
graph: org.apache.spark.graph.Graph[Int,Int] = org.apache.spark.graph.impl.GraphImpl@486e758c
> graph.outDegrees // works fine
res: org.apache.spark.graph.VertexRDD[Int] = VertexRDD[229] at RDD at VertexRDD.scala:53
> graph.outDegrees.top(3) // nope
...
java.lang.ClassCastException: scala.collection.immutable.$colon$colon cannot be cast to org.apache.spark.util.BoundedPriorityQueue
at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:664)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:661)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:55)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:663)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:455)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$3$$anon$1$$anonfun$receive$1.apply(DAGScheduler.scala:134)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$3$$anon$1$$anonfun$receive$1.apply(DAGScheduler.scala:130)
at akka.actor.Actor$class.apply(Actor.scala:318)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$3$$anon$1.apply(DAGScheduler.scala:116)
at akka.actor.ActorCell.invoke(ActorCell.scala:626)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
at akka.dispatch.Mailbox.run(Mailbox.scala:179)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Thanks,
Andres