Traversing iterator elements in parallel

1,640 views
Skip to first unread message

Juha Heljoranta

unread,
Nov 29, 2012, 4:26:18 PM11/29/12
to scala...@googlegroups.com
What would be the most convenient way to traverse iterator in parallel?

I need to do something along these lines:

val f: A => B = ... // cpu intensive
def iterator = new Iterator[A] {
override def next(): A = ... // might block
override def hasNext: Boolean = ...
}
iterator.par.foreach(g)

Iterator.par would be perfect but unfortunately such method doesn't exist.

Any suggestions?

Cheers,
Juha

Vlad Patryshev

unread,
Nov 29, 2012, 6:29:46 PM11/29/12
to Juha Heljoranta, scala...@googlegroups.com
Iterator may be not the right trait for this. Traversable might be the right trait; not sure though if Scala's Traversable is the same thing as "traversable functor".

Thanks,
-Vlad

Nils Kilden-Pedersen

unread,
Nov 29, 2012, 8:37:07 PM11/29/12
to Juha Heljoranta, scala...@googlegroups.com
There's very likely a better, i.e. Scalaish, way, but this will work:

val pool = collection.parallel.ThreadPoolTasks.defaultThreadPool
val futures = iterator.map { a: A =>
  pool submit new java.util.concurrent.Callable[B] { def call = // A => B }
}

which gives you a list of (Java) futures.

Som Snytt

unread,
Nov 29, 2012, 11:35:22 PM11/29/12
to Nils Kilden-Pedersen, Juha Heljoranta, scala...@googlegroups.com
On Thu, Nov 29, 2012 at 3:26 PM, Juha Heljoranta <juha.he...@iki.fi> wrote:
What would be the most convenient way to traverse iterator in parallel?


 Future traverse (iterator.toTraversable) has all the right words.

import scala.concurrent._
import scala.util._

object Test extends App {
  import ExecutionContext.Implicits.global
  implicit class Completer[A](val f: Future[A]) extends AnyVal {
    import scala.concurrent.duration._
    def completing(pf: PartialFunction[Try[A], Unit]) {
      f onComplete pf
      Await ready (f, Duration.Inf)
    }
  }
  val it = (1 to 10).iterator

  Future.traverse(it.toTraversable)(i => future(10*i)) completing {
    case Success(all) => println(all mkString ",")
    case Failure(e)   => e.printStackTrace
  }
}

Juha Heljoranta

unread,
Nov 30, 2012, 5:34:34 PM11/30/12
to scala...@googlegroups.com, Som Snytt, Nils Kilden-Pedersen
Whoa, Future.traverse rocks! :)

val a = Future.traverse(iterator)(x => future(f(x)))
println(Await.result(a, Duration.Inf).size)

But unfortunately it accumulates results before returning which means that I will get oome... Yes, I have a bad habit of making iterators which are infinite or larger than heap...

Anyway, thanks for replies! I ended up writing my map method which is lazy enough to handle infinite Iterators.


import scala.concurrent.{ ExecutionContext, future, Future }
import scala.concurrent.Await.result
import scala.concurrent.duration.Duration.Inf
import java.util.concurrent.{ BlockingQueue, Executors, LinkedBlockingQueue }

object ParItr {

def map[A, B](i: Iterator[A])(f: A => B)(implicit execctx: ExecutionContext): Iterator[B] = {
val cpus = Runtime.getRuntime().availableProcessors() + 1
val queue: BlockingQueue[Option[Future[B]]] = new LinkedBlockingQueue(cpus * cpus)
future {
try i.foreach(x => queue.put(Some(future { f(x) })))
finally queue.put(None) // poison
}
new Iterator[B] {

private[this] var fopt: Option[Future[B]] = None
private[this] var alive = true

override def next() =
if (hasNext) { val v = result(fopt.get, Inf); fopt = None; v }
else Iterator.empty.next()

override def hasNext: Boolean = alive && take().isDefined

private def take(): Option[Future[B]] = {
if (fopt.isEmpty) {
fopt = queue.take() match {
case None => { alive = false; None }
case some => some
}
}
fopt
}

}
}
}

Roland Kuhn

unread,
Dec 2, 2012, 5:49:15 AM12/2/12
to Juha Heljoranta, scala...@googlegroups.com, Som Snytt, Nils Kilden-Pedersen
Hi Juha,

you might want to wrap the "foreach(… queue.put …)" in "blocking{}" to avoid possible starvation if this map() is used several times in parallel; another possibility would be to use an actor and avoid blocking operations altogether (the actor would keep track of the outstanding work items, everything fully event-driven). A similar effect could of course be achieved by running on a dedicated and deliberately sized thread pool and spawning new futures for each iterator.next operation (that pool would have to be strictly FIFO, though). It's a matter of taste, I guess ;-)


Regards,

Dr. Roland Kuhn
Akka Tech Lead
Typesafe – The software stack for applications that scale.
twitter: @rolandkuhn
Reply all
Reply to author
Forward
0 new messages