Juha Heljoranta
unread,Nov 30, 2012, 5:34:34 PM11/30/12Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to scala...@googlegroups.com, Som Snytt, Nils Kilden-Pedersen
Whoa, Future.traverse rocks! :)
val a = Future.traverse(iterator)(x => future(f(x)))
println(Await.result(a, Duration.Inf).size)
But unfortunately it accumulates results before returning which means that I will get oome... Yes, I have a bad habit of making iterators which are infinite or larger than heap...
Anyway, thanks for replies! I ended up writing my map method which is lazy enough to handle infinite Iterators.
import scala.concurrent.{ ExecutionContext, future, Future }
import scala.concurrent.Await.result
import scala.concurrent.duration.Duration.Inf
import java.util.concurrent.{ BlockingQueue, Executors, LinkedBlockingQueue }
object ParItr {
def map[A, B](i: Iterator[A])(f: A => B)(implicit execctx: ExecutionContext): Iterator[B] = {
val cpus = Runtime.getRuntime().availableProcessors() + 1
val queue: BlockingQueue[Option[Future[B]]] = new LinkedBlockingQueue(cpus * cpus)
future {
try i.foreach(x => queue.put(Some(future { f(x) })))
finally queue.put(None) // poison
}
new Iterator[B] {
private[this] var fopt: Option[Future[B]] = None
private[this] var alive = true
override def next() =
if (hasNext) { val v = result(fopt.get, Inf); fopt = None; v }
else Iterator.empty.next()
override def hasNext: Boolean = alive && take().isDefined
private def take(): Option[Future[B]] = {
if (fopt.isEmpty) {
fopt = queue.take() match {
case None => { alive = false; None }
case some => some
}
}
fopt
}
}
}
}