I have just taken a library we have in Java that looks like this:
interface ListenableFuture<T> extends j.u.c.Future<T> {
public void addListener(FutureListener<T> l);
public void removeListener(FutureListener<T> l);
}
Where
interface FutureListener<T> {
void onCancelled();
void onThrowable(Throwable t);
void onResult(T result);
}
And I have wrapped this with zip/map/flatMap/point etc; ultimately generating a "sequence" method which looks like this:
type CanBuildSelf[CC[_], A, B] = collection.generic.CanBuildFrom[CC[A], B, CC[B]]
def sequence[T, X, CC[X] <: Traversable[X]](futures: CC[ListenableFuture[T]])(implicit cbf: CanBuildSelf[CC, T, T]): ListenableFuture[CC[T]] = {
@annotation.tailrec def seq(fs: Traversable[ListenableFuture[T]], acc: ListenableFuture[collection.mutable.Builder[T, CC[T]]]): ListenableFuture[collection.mutable.Builder[T, CC[T]]] =
if (fs.isEmpty)
acc
else {
seq(fs.tail, fs.head zip acc map { case (t, b) => b synchronized { b += t }})
}
val builder = cbf()
builder.sizeHint(futures.size)
seq(futures, point(builder)) map (_.result())
}
If I can sequence on a large List of futures, it generates a huge nested structure of futures which delegate their calls addListener/removeListener. As such, adding a listener to the top-level future blows the stack. This is where I get confused; presumably I can use Runar's trampoline mechanism for doing this, but I don't really understand it well enough. Can anyone get me started?
Chris
Here's my implementation of map:
def map[B](f: T ⇒ B): ListenableFuture[B] = new DelegatingListenableFuture[B] {
@volatile private var listeners = Map.empty[FutureListener[B], FutureListener[T]]
def addFutureListener(listener: FutureListener[B]) {
val l = new FutureListener[T] {
def onResult(result: T) {
listener.onResult(f(result))
}
def onThrowable(t: Throwable) {
listener.onThrowable(t)
}
def onCancelled() {
listener.onCancelled()
}
}
listeners += listener → l
futr[T].addFutureListener(l) //BLOWS STACK!
}
And here is zip:
def zip[B](that: ListenableFuture[B]): DelegatingListenableFuture[(T, B)] = new DelegatingListenableFuture[(T, B)] {
@volatile private var listeners = Map.empty[FutureListener[(T, B)], (FutureListener[T], FutureListener[B])]
def addFutureListener(listener: FutureListener[(T, B)]) {
val firstArrival = new java.util.concurrent.atomic.AtomicReference[Either[T, B]]
val lT = new FutureListener[T] {
def onResult(result: T) {
if (!firstArrival.compareAndSet(null, Left(result)))
listener.onResult(result → (firstArrival.get().right.toOption getOrElse sys.error("Unexpected absent R value: " + firstArrival.get())))
}
def onThrowable(t: Throwable) {
listener.onThrowable(t)
}
def onCancelled() {
listener.onCancelled()
}
}
val lB = new FutureListener[B] {
def onResult(result: B) {
if (!firstArrival.compareAndSet(null, Right(result)))
listener.onResult((firstArrival.get().left.toOption getOrElse sys.error("Unexpected absent L value: " + firstArrival.get())) → result)
}
def onThrowable(t: Throwable) {
listener.onThrowable(t)
}
def onCancelled() {
listener.onCancelled()
}
}
futr[T].addFutureListener(lT) //BLOWS STACK!
that.addFutureListener(lB)
listeners += (listener → (lT → lB))
}
DelegatingListenableFuture simply overrides isCancelled/isDone/cancel to pass thru to the underlying future (again, all of these will blow the stack).