Hi,
This is what I'm trying to do, a quick hack inspired with current firstCompletedOf implementation :
import scala.concurrent._
import scala.util._
type Futures[T] = TraversableOnce[Future[T]]
def select[T](futures: Futures[T]) (implicit executor: ExecutionContext)
: Future[Tuple2[T, Futures[T]]] = {
val p = Promise[Tuple2[T, Futures[T]]]()
def completeFirst(remains: =>Futures[T])(t:Try[T]) {
//p tryComplete t.map(c => c->remains)
p tryComplete (t match {
case Success(c)=> Success(c->remains)
case Failure(ex) => ex.printStackTrace() ; Failure(ex)
}
)
}
futures.foreach(f => f onComplete completeFirst(futures.filter(_!=f))_ )
p.future
}
To test it I'm using the following :
import concurrent._
import duration._
implicit val customEC = ExecutionContext.fromExecutorService(
java.util.concurrent.Executors.newCachedThreadPool()
//java.util.concurrent.Executors.newFixedThreadPool(10)
)
def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS }
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo.map { t => future { sleep(t) } }
def getResults[T](futures:TraversableOnce[Future[T]]) {
val nextOne = select(futures)
nextOne.onComplete { result =>
result match {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
getResults(remains)
case Failure(ex) =>
ex.printStackTrace()
}
}
}
getResults(workToDoFutures)
Here what I do not understand is the reason why all possible results are not printed, I only got some of them either one or two first ones (sleep(1) and sleep(2) results):
just for visual debug purposes 1
just for visual debug purposes 2
And no more messages printed.... even with a final sleep to avoid any premature exit.
I'm using java 1.6.0_37, scala 2.10.0, any idea about what's going on ? I think I miss something...
any help would be very appreciated
regards,