easy parallel mapping an Iterator

359 views
Skip to first unread message

Sam Halliday

unread,
Oct 17, 2013, 4:57:20 AM10/17/13
to scala...@googlegroups.com
Dear all,

I have been coding in Scala professionally for over a year. In that time, there is
one pattern in particular that keeps popping up and I would like to know if anybody
else has seen it and how they deal with it:

1. there is some very cheap rule for generating a starting state
2. we have to perform some costly mapping to each state
3. there is a filtering or reduction step

This is trivial: it's just a variation of MapReduce with a generation step, e.g.

  (1 until 100).map(_ * 2).reduce(_ + _)

and we can perform our "expensive" map in parallel

  (1 until 100).par.map(_ * 2).reduce(_ + _)


However, throw in this spanner and it gets interesting:

4. there are far too many states to store them all

  (1 until Int.MaxValue).map(_ * 2).reduce(_ + _)
  --> Human sacrifice, dogs and cats living together... mass hysteria!
      (and, most certainly, OutOfMemoryError)


Of course, the way to get around the OOM is to implement a custom
Iterator/TraversableOnce: it is memory-less and will allow the
garbage collector to clean up older elements as we go.

However, there is no simple way (that I know of) to wrap an Iterator as a
GenIterable/ParIterable, which would allow the map to occur in parallel.


Arguably, the correct solution is to use Akka. An actor to generate and gather results
(ensuring only X states are currently being processed), and a heavily distributed Actor
that does the expensive piece, and another that does the reduction
(with failure handling so that nothing gets lost, etc, etc)

But using Akka incurs a lot of code and setup, which is only really feasible if the project
is already making heavy use of Akka (and perhaps overkill in any case).


What I'd really like to have would be a way to get the same benefits as .par gives
me for solid Collection implementations, e.g.

  val myIterator = new MyLazyForgetfulIterator
  parallelIterable(myIterator).map(_ * 2).reduce(_ + _)


I can imagine how this might work: invocations of `map` would grab N states from
the underlying Iterator (N defining the buffer size, controlling maximum memory
usage) and apply a Future map to all of them, putting the results into a buffer.
When `next` is called, the head of the buffered futures would be popped and a new
Future-mapped state would be pushed to the end. The next method would block until the
Future finishes (without the buffer, the map step would block anyway, but at least this way
it has a little bit of a head start) and return the result. So long as the next is being called fast
enough, there can be as many as N Futures on the go.

But this sounds like a recipe for a bug-ridden piece of code that probably doesn't
handle all the other things that Iterable is supposed to handle, so I've not been inclined
to write it yet.


I'd really appreciate this groups thoughts (both on this suggested implementation, and
other alternatives I've not covered).

Best regards,
Sam

Michael Schmitz

unread,
Oct 17, 2013, 2:16:37 PM10/17/13
to Sam Halliday, scala-user
I'd love to have a parallel iterator that uses a worker thread to
process work, even if it only had foreach.

I usually process iterators in parallel with `iterator.grouped(batchSize).par`.

Peace. Michael
> --
> You received this message because you are subscribed to the Google Groups
> "scala-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to scala-user+...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

Seth Tisue

unread,
Oct 17, 2013, 2:41:50 PM10/17/13
to scala-user
Y'all have seen this other thread:
https://groups.google.com/d/msg/scala-user/22PQy5dFr7s/3Ho7Ri8FhJ8J

right? I only just now figured out these are two different threads and
Sam and David are two different people :-)

Samuel Halliday

unread,
Oct 17, 2013, 5:14:48 PM10/17/13
to Seth Tisue, scala-user
Thanks. However Stream keeps a reference to all previous results and is not a substitute for Iterator in this context.

--
Sam
> --
> You received this message because you are subscribed to a topic in the Google Groups "scala-user" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/scala-user/2wprKWyHAUo/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to scala-user+...@googlegroups.com.

Samuel Halliday

unread,
Oct 17, 2013, 5:17:30 PM10/17/13
to Michael Schmitz, scala-user
That's not a bad workaround but it isn't as performant as an approach that is always processing. As one reaches the end of every group, not all the CPU is being used. Still, better than serial!

--
Sam

Samuel Halliday

unread,
Oct 17, 2013, 6:13:57 PM10/17/13
to Michael Schmitz, scala-user
I decided to write this bloody thing :-)

It was a lot easier than I expected, but would require some more work to perform parallel operations of other sorts (foreach, etc) and to implement the right trait to distinguish it as a parallel Iterator.


class ParMapIterator[T](it: Iterator[T], buffered: Int = 100)
(implicit context: ExecutionContext) extends Iterator[T] {

def hasNext = it.hasNext

def next() = it.next()

override def map[B](f: T => B) = {
val buffer = new mutable.Queue[Future[B]]()
def push() = if (it.hasNext) buffer.enqueue {
val n = it.next()
Future { f(n) }
}

for (i <- 0 until buffered) push()

new Iterator[B] {
def hasNext = !buffer.isEmpty

def next() = {
val n = buffer.dequeue()
push()
Await.result(n, Duration.Inf)
}
}
}
}

object ParMapIterator {
implicit class ParPimpedIterator[T](it: Iterator[T]) {
def par(buffered: Int = 100)(implicit context: ExecutionContext) = new ParMapIterator[T](it, buffered)
def par(implicit context: ExecutionContext) = new ParMapIterator[T](it)
}
}


and use like

val myIterator = new Iterator[Int]() {
var i = 0
def hasNext = i < 99
def next() = {
i += 1
i
}
}

import ExecutionContext.Implicits.global
import ParMapIterator._

val out = myIterator.par.map{_ * 2}.reduce(_ + _)


--
Sam

On 17 Oct 2013, at 19:16, Michael Schmitz <mic...@schmitztech.com> wrote:

Hua Jiang

unread,
Oct 18, 2013, 4:22:10 AM10/18/13
to Samuel Halliday, Michael Schmitz, scala-user
Here is another solution, in which a custom ExecutionContext with a limited blocking queue is used to keep memory footprint small. The initial iterator is transformed to an iterator of Futures, the elements of which can be accessed (trigger the expensive calculation) without waiting for the calculations on previous elements to complete. Any method on the transformed iterator can be used. But keep in mind that it is Future's that we are handling now. And do not use Await.wait or Await.result in the callbacks of map, foreach, etc, since it will cause the calculations to wait one another.


import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
import scala.concurrent.duration.Duration

val numThread = 5

implicit val context = ExecutionContext.fromExecutorService(
  new ThreadPoolExecutor(
    numThread, numThread,
    0L, TimeUnit.SECONDS,
    new ArrayBlockingQueue[Runnable](5) {
      override def offer(e: Runnable) = {
        put(e); // Waiting for empty room
        true
      }
    }
  )
)

def timeKiller(i: Int) = {
  (1 to 10000).foreach(_ * 2)
  i.toLong
}

def reduce(a: Future[Long], b: Future[Long]) = {
  a.flatMap(v => b.map(v + _))
}

val sum = (1 to 100000).toIterator.map(i => future(timeKiller(i))).reduce(reduce)

println(Await.result(sum, Duration.Inf))

context.shutdown

Samuel Halliday

unread,
Oct 18, 2013, 4:59:24 AM10/18/13
to Hua Jiang, Michael Schmitz, scala-user
Thanks Hua,

The blocking queue is working at the level of the execution of Futures, right?

If that is the case, then this won't help me out I'm afraid. Try going to Int.MaxValue instead of 100000 and that more closely approximates what I see. I suspect you'll get an OOM because it looks like you're constructing the whole problem as a series of futures up front.


--
Sam

Hua Jiang

unread,
Oct 18, 2013, 5:28:17 AM10/18/13
to Samuel Halliday, Michael Schmitz, scala-user
I suspect you'll get an OOM because it looks like you're constructing the whole problem as a series of futures up front.

No OOM will appear because the queue used here is a little different. Notice how the offer method is overrided. When the queue is full, it will block the producer thread.

Sam Halliday

unread,
Oct 18, 2013, 5:33:02 AM10/18/13
to Hua Jiang, Michael Schmitz, scala-user
You're absolutely right, I read the executor part if your code too fast! :-)

I am not fussed about the map/reduce approach here but I'll be borrowing your executor for a rainy day :-D

Kind regards,
Sam Halliday

-- 
Sent from my iPhone
Reply all
Reply to author
Forward
0 new messages