Parallelize stream processing and heap memory impact...

179 views
Skip to first unread message

david crosson

unread,
Oct 4, 2013, 5:49:15 PM10/4/13
to scala...@googlegroups.com
Hi,

I'm trying to find a way to reach faster stream processing using parallelism, so I wonder why this kind of expression use so much jvm heap memory, here is a simplified example of my problematic : 

$ scala -J-Xmx128m
Welcome to Scala version 2.10.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40).
Type in expressions to have them evaluated.
Type :help for more information.

scala> Stream.from(1).filter(_ % 2 == 0).drop(1000000).take(10).toList
res0: List[Int] = List(2000002, 2000004, 2000006, 2000008, 2000010, 2000012, 2000014, 2000016, 2000018, 2000020)

scala> Stream.from(1).grouped(1000).map(_.par).flatMap(_.filter(_ % 2 == 0)).toStream.drop(1000000).take(10).toList
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:53)
at scala.collection.parallel.ParIterableLike$Filter.tryLeaf(ParIterableLike.scala:1108)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Is a solution possible ? Or do I have to switch to (for example) an actor implementation ?

regards,
David.

Seth Tisue

unread,
Oct 15, 2013, 2:02:41 PM10/15/13
to scala-user
(only partially addresses your question, but:)

It still gives OutOfMemoryError with ".map(_.par)" removed, so the
issue isn't specific to parallel collections.

Changing "Stream.from(1)" to "Iterator.from(1)" or
"Stream.from(1).iterator" makes the problem go away — and you can keep
the ".map(_.par)".

david crosson

unread,
Oct 17, 2013, 1:45:25 PM10/17/13
to scala...@googlegroups.com
Yes :) you did it ! that's an interesting workaround. I give a try, and the memory footprint remains low even with very high drop value, and almost all my server CPU are now used by this processing.

Stream.from(1).iterator.grouped(1000).map(_.par).flatMap(_.filter(_ % 2 == 0)).toStream.drop(100000000).take(10).toList

Now still have to understand why without .iterator, it doesn't work as a // stream. May be there's a relationship with this bug : 

thanks.
Reply all
Reply to author
Forward
0 new messages