scala 2.10 : how to get a future on the first completed future and the remaining futures from a sequence of futures ?

3,120 views
Skip to first unread message

david crosson

unread,
Jan 2, 2013, 5:04:37 PM1/2/13
to scala...@googlegroups.com
Hi all,

how do I get the same result using scala 2.10 as the one given by the "select" method of twitter future implementation ?

/**
* "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
*/

scala.concurrent.firstCompletedOf looks interesting, but it only does half the job, as it doesn't give us remaining futures :( so can't remove the found completed from the given futures collection.

David.

Rodrigo Cano

unread,
Jan 2, 2013, 5:41:25 PM1/2/13
to david crosson, scala...@googlegroups.com
firstCompletedOf would have to return a Future[(T, Seq[Future[T])] which looks really ugly, in order to work as you expect. Remember that firstCompletedOf does NOT return the first completed Future (since this would have to block) but instead, it returns a new one, which will complete as soon as one of the passed ones completes.

√iktor Ҡlang

unread,
Jan 2, 2013, 5:51:50 PM1/2/13
to david crosson, scala-user
Why not just port the Twitter select to scala.concurrent.Future?

Cheers,
--
Viktor Klang
Director of Engineering

Typesafe - The software stack for applications that scale
Twitter: @viktorklang

david crosson

unread,
Jan 8, 2013, 6:11:03 PM1/8/13
to scala...@googlegroups.com, david crosson
Hi,

This is what I'm trying to do, a quick hack inspired with current firstCompletedOf implementation :
  import scala.concurrent._
  import scala.util._
 
  type Futures[T] = TraversableOnce[Future[T]]
  
  def select[T](futures: Futures[T]) (implicit executor: ExecutionContext)
    : Future[Tuple2[T, Futures[T]]] = {
    val p = Promise[Tuple2[T, Futures[T]]]()
    def completeFirst(remains: =>Futures[T])(t:Try[T]) {
      //p tryComplete t.map(c => c->remains)
      p tryComplete (t match {
        case Success(c)=> Success(c->remains)
        case Failure(ex) => ex.printStackTrace() ; Failure(ex)
       }
      )
    }
    futures.foreach(f => f onComplete completeFirst(futures.filter(_!=f))_ )
    p.future
  }

To test it I'm using the following :
    import concurrent._
    import duration._
    implicit val customEC = ExecutionContext.fromExecutorService(
      java.util.concurrent.Executors.newCachedThreadPool()
      //java.util.concurrent.Executors.newFixedThreadPool(10)
      )

    def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS }
    val workToDo = List(4, 10, 2, 8, 1)
    val workToDoFutures = workToDo.map { t => future { sleep(t) } }
       
    def getResults[T](futures:TraversableOnce[Future[T]]) {
      val nextOne = select(futures)
      nextOne.onComplete { result =>
        result match {
          case Success((result, remains)) =>
            println(s"just for visual debug purposes ${result}")
            getResults(remains)
          case Failure(ex) =>
            ex.printStackTrace()
        }
      }
    }
    getResults(workToDoFutures)




Here what I do not understand is the reason why all possible results are not printed, I only got some of them either one or two first ones (sleep(1) and sleep(2) results):
just for visual debug purposes 1
just for visual debug purposes 2
And no more messages printed.... even with a final sleep to avoid any premature exit.

I'm using java 1.6.0_37, scala 2.10.0, any idea about what's going on ? I think I miss something...

any help would be very appreciated

regards,

√iktor Ҡlang

unread,
Jan 8, 2013, 6:16:35 PM1/8/13
to david crosson, scala-user
Here's a quick stab at it: https://gist.github.com/4488970

Cheers,

david crosson

unread,
Jan 8, 2013, 6:34:42 PM1/8/13
to scala...@googlegroups.com, david crosson
OK thank you very much !!!
It works perfectly :)

this is something that may be added to the standard API ?

Cheers,
David.

√iktor Ҡlang

unread,
Jan 8, 2013, 6:46:28 PM1/8/13
to david crosson, scala-user
Hi David,

you're most welcome! I updated the Gist with a cleaned up version.

I'm not sure it will go into the stdlib as it is very memory inefficient, but perhaps something that solves the same kinds of problems?

Cheers,

Som Snytt

unread,
Jan 9, 2013, 3:04:00 AM1/9/13
to √iktor Ҡlang, david crosson, scala-user
On Tue, Jan 8, 2013 at 3:16 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:
Here's a quick stab at it: https://gist.github.com/4488970



As a cautionary tale, if you stick it in a value class, you'll encounter
https://issues.scala-lang.org/browse/SI-6891

which apparently was discovered and filed by one of the elves who, with too much time on his hands after the high season for elven work, spends nights and weekends (and weekend nights) in the workshop pounding on the compiler.

It's also possible that Typesafe offers a special deal that includes the "extra value class", the one that comes with fries.

This is the first time I've managed namespaces this way:

implicit class Klangable(val co: Future.type) extends AnyVal

to enable Future select fs.  The implicit serves to attach the meaningful function name to a familiar locus.

Som Snytt

unread,
Jan 10, 2013, 1:31:50 PM1/10/13
to david crosson, scala...@googlegroups.com
On Tue, Jan 8, 2013 at 3:11 PM, david crosson <crosso...@gmail.com> wrote:

I think I miss something...

I did the same thing and missed the same thing.

You know how they say that parallel computations are hard to reason about?  This isn't that.

Instead, we used TraversableOnce and guess what, we can only traverse it once.

https://gist.github.com/4504281

(The corrected version in the gist uses a CanBuildFrom for your TravOnce.  The only other thing of interest is when the test for Viktor's code goes: Klang, klang, klang, went the trolley.)

Using your colonoscopy tool, you might think you're using $colon$colon Lists, but your TravOnce.filter is returning a disposable iterator.  By disposable, I mean like a tissue that is one-use and toss.

scala> List(1,2,3) filter (_ < 3)
res0: List[Int] = List(1, 2)

scala> .isTraversableAgain
res1: Boolean = true

scala> val t: TraversableOnce[Int] = List(1,2,3)
t: TraversableOnce[Int] = List(1, 2, 3)

scala> t.isTraversableAgain
res2: Boolean = true

scala> val f = t filter (_ < 3)
f: scala.collection.TraversableOnce[Int] = non-empty iterator

scala> f.isTraversableAgain
res3: Boolean = false

scala> f.size
res4: Int = 2

scala> f.nonEmpty
res5: Boolean = false


Why happened to, like, dynamic dispatch?  Is it something subtle in the method signature?

Actually, like the scaladoc says, it's using

scala/collection/TraversableOnce$MonadOps.filter

Just wanted to show off the javap syntax:

scala> class F { def f(vs: TraversableOnce[Int]) = vs filter (_ < 3) }
defined class F

scala> :javap F#f
  public scala.collection.TraversableOnce<java.lang.Object> f(scala.collection.TraversableOnce<java.lang.Object>);
    flags: ACC_PUBLIC
    Code:
      stack=4, locals=2, args_size=2
         0: getstatic     #13                 // Field scala/collection/TraversableOnce$.MODULE$:Lscala/collection/TraversableOnce$;
         3: aload_1      
         4: invokevirtual #17                 // Method scala/collection/TraversableOnce$.MonadOps:(Lscala/collection/TraversableOnce;)Lscala/collection/TraversableOnce$MonadOps;
         7: new           #19                 // class F$$anonfun$f$1
        10: dup          
        11: aload_0      
        12: invokespecial #23                 // Method F$$anonfun$f$1."<init>":(LF;)V
        15: invokevirtual #29                 // Method scala/collection/TraversableOnce$MonadOps.filter:(Lscala/Function1;)Lscala/collection/TraversableOnce;
        18: areturn      
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
               0      19     0  this   LF;
               0      19     1    vs   Lscala/collection/TraversableOnce;
      LineNumberTable:
        line 7: 0
    Signature: #51                          // (Lscala/collection/TraversableOnce<Ljava/lang/Object;>;)Lscala/collection/TraversableOnce<Ljava/lang/Object;>;

david crosson

unread,
Jan 17, 2013, 4:45:51 PM1/17/13
to scala...@googlegroups.com, david crosson
Hi,

I've just created a Gist with a scala script usage example : 

The first thing it outputs is of course ''END of the script reached" :)

I'll next investigate overall cost... 

The script outputs : 
$ ./futselect 
END of the script reached
OK  - slept 1s
OK  - slept 2s
OK  - slept 4s
OK  - slept 8s
NOK - Error raised
NOK - Error raised
OK  - slept 15s

regards,
Reply all
Reply to author
Forward
0 new messages