[2.10] running futures in a for loop

3,300 views
Skip to first unread message

l.a.derks

unread,
Mar 28, 2013, 1:49:09 PM3/28/13
to scala...@googlegroups.com
Hello

I have a list of items and for each item I want to execute a future. But the thing is that I only want to continue to the next item is the current future has completed.

In pseudo code it would be something like this:

for(item <- itemList) {
  val future = methodThatReturnsFuture
   
  if future has completed then continue with next item.

}

How can I do this in Scala?

regards,
Leon

√iktor Ҡlang

unread,
Mar 28, 2013, 2:09:40 PM3/28/13
to l.a.derks, scala-user
import scala.concurrent.{Future, ExecutionContext}
import ExecutionContext.Implicits.global // OR whatever ExecutionContext you're going to use

implicit class ForeachAsync[T](iterable: Iterable[T]) {
  def foreachAsync[U](f: T => U)(implicit ec: ExecutionContext): Unit = {
    def next(i: Iterator[T]): Unit = if (i.hasNext) Future(f(i.next)) onComplete { case _ => next(i) }
    next(iterable.iterator)
  }
}

scala> List(1,2,3,4) foreachAsync println
1

scala> 2
3
4


Something like that?

Cheers,



--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Viktor Klang
Director of Engineering

Twitter: @viktorklang

Derek Williams

unread,
Mar 28, 2013, 4:28:06 PM3/28/13
to l.a.derks, scala-user
This could be done using folds as well:

List(1,2,3,4).foldLeft(Future successful ())((future, n) => future flatMap (_ => println(n)))

Or using Scalaz 7:

// Scalaz imports
import scalaz.std.list._
import scalaz.std.anyVal._
import scalaz.syntax.foldable._
import scalaz.Monoid

// Monoid instance for Future[A: Monoid]
implicit def futureMonoid[A: Monoid](implicit executionContext: ExecutionContext) = new Monoid[Future[A]] {
  def append(f1: Future[A], f2: => Future[A]): Future[A] = for (a1 <- f1; a2 <- f2) yield (Monoid[A].append(a1,a2))
  def zero: Future[A] = Future successful Monoid[A].zero
}

// and the payoff
List(1,2,3,4) foldMap (n => Future(println(n)))

Not as efficient or performant as Viktor's, but it does return the final folded result (in this case Future[Unit]). Viktor's code could of course be modified to also return some result.




--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Derek Williams

ericacm

unread,
Mar 28, 2013, 5:31:31 PM3/28/13
to scala...@googlegroups.com
scala> for {
     | i <- List(1,2,3,4)
     | f <- Future(println(i))
     | } ()
1
2
3
4

Derek Williams

unread,
Mar 28, 2013, 5:57:44 PM3/28/13
to ericacm, scala-user
On Thu, Mar 28, 2013 at 3:31 PM, ericacm <eri...@gmail.com> wrote:
scala> for {
     | i <- List(1,2,3,4)
     | f <- Future(println(i))
     | } ()
1
2
3
4

That causes all the Futures to be run in parallel, which the OP didn't want. It's only a combination of luck and the operation being simple (or you are using an ExecutionContext with only a single thread) that caused it to print in the correct order.

--
Derek Williams

√iktor Ҡlang

unread,
Mar 28, 2013, 5:59:39 PM3/28/13
to Derek Williams, ericacm, scala-user
And println is synchronized.


--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--

Eric Pederson

unread,
Mar 28, 2013, 6:15:03 PM3/28/13
to √iktor Ҡlang, Derek Williams, scala-user
Right - that must be it. 

I forgot, why are we using futures when we are sequentially waiting for results?

Sent from my iPhone"

√iktor Ҡlang

unread,
Mar 28, 2013, 6:18:32 PM3/28/13
to Eric Pederson, Derek Williams, scala-user
On Thu, Mar 28, 2013 at 11:15 PM, Eric Pederson <eri...@gmail.com> wrote:
Right - that must be it. 

I forgot, why are we using futures when we are sequentially waiting for results?

Well technically there is no "waiting" just an establishing of order. Oh, and there's a subtle difference in semantics between your proposals and mine, mine executes no matter if the function fails or not.

Cheers,

ericacm

unread,
Mar 28, 2013, 8:02:59 PM3/28/13
to scala...@googlegroups.com, Eric Pederson, Derek Williams
Ok - here's a "waiting" approach:

scala> List(1,2,3,4).foreach { i => val f = Future(i); println(Await.result(f, 1.second)) }
1
2
3
4

Here's Derek's initial foldLeft (with a small correction).  This is what I was aiming for originally (don't send emails on the way out the door :)  Non-waiting.

scala>   List(1,2,3,4).foldLeft(Future successful ())((fs, n) => fs flatMap (_ => Future(println(n))))
res12: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@2f1b03b7

scala> 1
2
3
4

And a variation collecting the results and then printing them out afterwards:

scala>   val fv = List(1,2,3,4).foldLeft(Future successful Vector.empty[Int])((fs, n) => fs flatMap (v => Future(v :+ n)))
fv: scala.concurrent.Future[scala.collection.immutable.Vector[Int]] = scala.concurrent.impl.Promise$DefaultPromise@251d737b

scala> Await.result(fv, 1.second) foreach println
1
2
3
4

Derek Williams

unread,
Mar 28, 2013, 8:54:13 PM3/28/13
to ericacm, scala-user
On Thu, Mar 28, 2013 at 6:02 PM, ericacm <eri...@gmail.com> wrote:
Here's Derek's initial foldLeft (with a small correction).  This is what I was aiming for originally (don't send emails on the way out the door :)  Non-waiting.

scala>   List(1,2,3,4).foldLeft(Future successful ())((fs, n) => fs flatMap (_ => Future(println(n))))
res12: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@2f1b03b7
 
My favorite part about this one is that by swapping the accumulator Future and the task Future ('fs' and 'Future(println(n))' in this example) you change the behaviour from sequential to parallel, while still returning a Future that completes when all the tasks are done.


--
Derek Williams

Leon Derks

unread,
Mar 29, 2013, 10:03:40 AM3/29/13
to scala...@googlegroups.com
Thanks Derek and Viktor for the examples!
Great!

Leon

Ganta Murali Krishna

unread,
Feb 3, 2016, 3:31:26 AM2/3/16
to scala-user, l.a....@gmail.com
Thanks Victor, That really helped. Here is a bit more improvised version

import scala.concurrent.{ExecutionContext, Future}

/**
* Created by Muralig on 2/3/2016.
*/
object IterableExtensions {


implicit class ForeachAsync[T](iterable: Iterable[T]) {

    /**
* Inspired by https://groups.google.com/forum/#!topic/scala-user/W9ykW8j3Ybg
*
* Usage example =>
*
* val list = List("Hour","Day","Week","Month")
*
* list.foreachAsync(a => processKpi(a),receiver)
*
*
* def receiver = PartialFunction[Any,Unit]{
* case Success(res) => println(res)
* case Failure(ex) => println(ex)
* }

* def processKpi(v:String) = {
* Future {//your future result }
* }
*

* @param futureFunc Future to be created for each element
* @param callBackFn partial function to receive the result
* @param onCompleteFn get notified when all messages are processed (
* @param ec
* @tparam F
*/
def foreachAsync[F <: Future[_]](futureFunc: T => F, callBackFn:PartialFunction[Any, Unit], onCompleteFn:Option[() => Unit ] = None)(implicit ec: ExecutionContext):Unit = {


def next(i: Iterator[T]): Unit = {
        if (i.hasNext) {
futureFunc(i.next) onComplete {
case msg => if (callBackFn.isDefinedAt(msg)) {
callBackFn(msg)
next(i)
} else throw new IllegalArgumentException(s"Provided pattern argument [$callBackFn] is incorrect and does not match with the received message: $msg ")
}
} else
if(onCompleteFn.isDefined) onCompleteFn.get()
}
next(iterable.iterator)
}
}
}

and the usage is here =>>>>>>>>>>>>>>>>>>>>>>>


import com.reactore.extensions.IterableExtensions._
import org.joda.time.DateTime
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}

object FutureApp extends App {

List("Hour","Day","Week","Month").foreachAsync(a => processKpi(a),receiver,Some(onComplete))

def receiver = PartialFunction[Any,Unit]{
case Success(res) => println(res)
case Failure(ex) => println(ex)
}

def processKpi(v:String) = {
Future {
Thread.sleep(3000)
if(v == "Week") throw new Exception(
"""==============================================
|
|Here we go, failed to process for week
|
|===============================================
""".stripMargin)
s"Value $v processed at ${DateTime.now}"
}
}

def onComplete() = {
println("all items are processed")
}

Thread.sleep(20000)
}

Viktor Klang

unread,
Feb 3, 2016, 4:54:33 AM2/3/16
to Ganta Murali Krishna, scala-user, Leon Derks
Hi Ganta,

how about:


implicit class ForeachAsync[T](iterable: Iterable[T]) {
  def foreachAsync[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[Unit] = {
      def next(i: Iterator[Future[U]]): Future[Unit] =
        if (i.hasNext) i.next().flatMap(_ => next(i)) else Future.successful(())
      Future(iterable.iterator.map(f)).flatMap(next)
    }
}

and then you can do:

scala> 1 to 10 foreachAsync { x => Future(println(x * x)) } foreach { _ => println("Done") }

scala> 1
4
9
16
25
36
49
64
81
100
Done


For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Ganta Murali Krishna

unread,
Feb 3, 2016, 5:41:36 AM2/3/16
to scala-user, gan...@gmail.com, l.a....@gmail.com
Hello Victor,

Looks soooo simpler, thank you so much, I should not have called mine as improved :) . But it does not satisfy my requirements, they are below.

1) Collect all the results of the futures, I agree that we can collect them within the method where future ( e.g. Future(println(x * x)) ) is created.
2) If any of the future throws an exception, it should not affect on other futures. Your example terminating the next futures if any exception occurs in middle. Also it is suppressing the exceptions.
3) Notify when everything is done


Please see attached test results

Regards
Murali

Viktor Klang

unread,
Feb 3, 2016, 7:24:35 AM2/3/16
to Ganta Murali Krishna, scala-user, Leon Derks
scala> implicit class ForeachAsync[T](val iterable: Iterable[T]) extends AnyVal {
     |   def foreachAsync[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[Iterable[Try[U]]] =
     |     Future.sequence(iterable.map(t => Future.successful(t).flatMap(f).map(Success[U](_)) recover { case t => Failure[U](t) }))
     | }
defined class ForeachAsync

scala> 1 to 10 foreachAsync { i => if (i == 5) throw null else Future(i * i) } foreach println

scala> Vector(Success(1), Success(4), Success(9), Success(16), Failure(java.lang.NullPointerException), Success(36), Success(49), Success(64), Success(81), Success(100))

Reply all
Reply to author
Forward
0 new messages