Stackless recursion thru deeply nested structures

49 views
Skip to first unread message

Chris Marshall

unread,
Jun 1, 2012, 3:25:34 AM6/1/12
to sca...@googlegroups.com
I have just taken a library we have in Java that looks like this:

interface ListenableFuture<T> extends j.u.c.Future<T> {
  public void addListener(FutureListener<T> l);
  public void removeListener(FutureListener<T> l);
}

Where 

interface FutureListener<T> {
  void onCancelled();
  void onThrowable(Throwable t);
  void onResult(T result);
}

And I have wrapped this with zip/map/flatMap/point etc; ultimately generating a "sequence" method which looks like this:

  type CanBuildSelf[CC[_], A, B] = collection.generic.CanBuildFrom[CC[A], B, CC[B]]
  def sequence[T, X, CC[X] <: Traversable[X]](futures: CC[ListenableFuture[T]])(implicit cbf: CanBuildSelf[CC, T, T]): ListenableFuture[CC[T]] = {
    @annotation.tailrec def seq(fs: Traversable[ListenableFuture[T]], acc: ListenableFuture[collection.mutable.Builder[T, CC[T]]]): ListenableFuture[collection.mutable.Builder[T, CC[T]]] =
      if (fs.isEmpty)
        acc
      else {
        seq(fs.tail, fs.head zip acc map { case (t, b) => b synchronized { b += t }})
      }

    val builder = cbf()
    builder.sizeHint(futures.size)
    seq(futures, point(builder)) map (_.result())
  }

If I can sequence on a large List of futures, it generates a huge nested structure of futures which delegate their calls addListener/removeListener. As such, adding a listener to the top-level future blows the stack. This is where I get confused; presumably I can use Runar's trampoline mechanism for doing this, but I don't really understand it well enough. Can anyone get me started?

Chris

Here's my implementation of map:

 def map[B](f: T ⇒ B): ListenableFuture[B] = new DelegatingListenableFuture[B] {
    @volatile private var listeners = Map.empty[FutureListener[B], FutureListener[T]]
    def addFutureListener(listener: FutureListener[B]) {
      val l = new FutureListener[T] {
        def onResult(result: T) {
          listener.onResult(f(result))
        }

        def onThrowable(t: Throwable) {
          listener.onThrowable(t)
        }

        def onCancelled() {
          listener.onCancelled()
        }
      }
      listeners +=  listener → l
      futr[T].addFutureListener(l)  //BLOWS STACK!
    }

And here is zip:

  def zip[B](that: ListenableFuture[B]): DelegatingListenableFuture[(T, B)] = new DelegatingListenableFuture[(T, B)] {
    @volatile private var listeners = Map.empty[FutureListener[(T, B)], (FutureListener[T], FutureListener[B])]
    def addFutureListener(listener: FutureListener[(T, B)]) {
      val firstArrival = new java.util.concurrent.atomic.AtomicReference[Either[T, B]]
      val lT = new FutureListener[T] {
        def onResult(result: T) {
          if (!firstArrival.compareAndSet(null, Left(result)))
            listener.onResult(result → (firstArrival.get().right.toOption getOrElse sys.error("Unexpected absent R value: " + firstArrival.get())))
        }

        def onThrowable(t: Throwable) {
          listener.onThrowable(t)
        }

        def onCancelled() {
          listener.onCancelled()
        }
      }
      val lB = new FutureListener[B] {
        def onResult(result: B) {
          if (!firstArrival.compareAndSet(null, Right(result)))
            listener.onResult((firstArrival.get().left.toOption getOrElse sys.error("Unexpected absent L value: " + firstArrival.get())) → result)
        }

        def onThrowable(t: Throwable) {
          listener.onThrowable(t)
        }

        def onCancelled() {
          listener.onCancelled()
        }
      }
      futr[T].addFutureListener(lT) //BLOWS STACK!
      that.addFutureListener(lB)
      listeners += (listener → (lT → lB))
    }

DelegatingListenableFuture simply overrides isCancelled/isDone/cancel to pass thru to the underlying future (again, all of these will blow the stack). 

oxbow_lakes

unread,
Jun 1, 2012, 5:05:29 AM6/1/12
to sca...@googlegroups.com
I should say that I have read  http://apocalisp.wordpress.com/2011/10/26/tail-call-elimination-in-scala-monads/  and am able to implement the isCancelled etc methods on my delegate as follows:

    def isCancelled = isCancelled(futr[Any]).run
    private[scala] def isCancelled(f: ListenableFuture[Any]): Trampoline[Boolean] = f match {
      case w: DelegatingListenableFuture[_] => More( () => isCancelled(w.underlying))
      case u: ListenableFuture[_]           => Done(u.isCancelled)
    }

It's really the complexity of the addListener methods which have totally confused me (both in the adding of listeners and the invocation of callbacks)

Chris
Reply all
Reply to author
Forward
0 new messages