--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
scala> for {| i <- List(1,2,3,4)| f <- Future(println(i))| } ()1234
--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
Right - that must be it.I forgot, why are we using futures when we are sequentially waiting for results?
Here's Derek's initial foldLeft (with a small correction). This is what I was aiming for originally (don't send emails on the way out the door :) Non-waiting.scala> List(1,2,3,4).foldLeft(Future successful ())((fs, n) => fs flatMap (_ => Future(println(n))))res12: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@2f1b03b7
import scala.concurrent.{ExecutionContext, Future}
/**
* Created by Muralig on 2/3/2016.
*/
object IterableExtensions {
implicit class ForeachAsync[T](iterable: Iterable[T]) {
/**
* Inspired by https://groups.google.com/forum/#!topic/scala-user/W9ykW8j3Ybg
*
* Usage example =>
*
* val list = List("Hour","Day","Week","Month")
*
* list.foreachAsync(a => processKpi(a),receiver)
*
*
* def receiver = PartialFunction[Any,Unit]{
* case Success(res) => println(res)
* case Failure(ex) => println(ex)
* }
* def processKpi(v:String) = {
* Future {//your future result }
* }
*
* @param futureFunc Future to be created for each element
* @param callBackFn partial function to receive the result
* @param onCompleteFn get notified when all messages are processed (
* @param ec
* @tparam F
*/
def foreachAsync[F <: Future[_]](futureFunc: T => F, callBackFn:PartialFunction[Any, Unit], onCompleteFn:Option[() => Unit ] = None)(implicit ec: ExecutionContext):Unit = {
def next(i: Iterator[T]): Unit = {
if (i.hasNext) {
futureFunc(i.next) onComplete {
case msg => if (callBackFn.isDefinedAt(msg)) {
callBackFn(msg)
next(i)
} else throw new IllegalArgumentException(s"Provided pattern argument [$callBackFn] is incorrect and does not match with the received message: $msg ")
}
} else
if(onCompleteFn.isDefined) onCompleteFn.get()
}
next(iterable.iterator)
}
}
}
import com.reactore.extensions.IterableExtensions._
import org.joda.time.DateTime
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object FutureApp extends App {
List("Hour","Day","Week","Month").foreachAsync(a => processKpi(a),receiver,Some(onComplete))
def receiver = PartialFunction[Any,Unit]{
case Success(res) => println(res)
case Failure(ex) => println(ex)
}
def processKpi(v:String) = {
Future {
Thread.sleep(3000)
if(v == "Week") throw new Exception(
"""==============================================
|
|Here we go, failed to process for week
|
|===============================================
""".stripMargin)
s"Value $v processed at ${DateTime.now}"
}
}
def onComplete() = {
println("all items are processed")
}
Thread.sleep(20000)
}