Iterable[Future[A]] => Stream[A]

494 views
Skip to first unread message

Maarten Bynens

unread,
Mar 28, 2013, 6:40:45 AM3/28/13
to scala...@googlegroups.com
Hi all,

I am looking for a function that turn a list of futures in a stream of results. Each future that completes adds its result to the stream. If you force more values from the stream than there are completed futures, the request blocks.
As I couldn't immediately find anything in the API, I implemented my own, but I'm not so sure it's good.

Any comments would be appreciated!

import scala.concurrent._
import ExecutionContext.Implicits.global
def toStream[A](futures: Iterable[Future[A]]): Stream[A] = {
  import scala.concurrent.duration.Duration
  import scala.util.Try

  val promises = Stream.fill(futures.size)(Promise[A]())
  futures.foreach(_ onComplete (addToStream(_,promises)))
  def addToStream(result: Try[A], prom: Stream[Promise[A]]): Unit = prom match {
    case Stream() => ???
    case Stream(promise, rest@_*) =>if(promise tryComplete result) () else addToStream(result, prom.tail)
  }
  promises map {
    promise: Promise[A] => Await.result(promise.future, Duration(3, "seconds"))
  }
}

Regards,
Maarten

√iktor Ҡlang

unread,
Mar 28, 2013, 8:25:26 AM3/28/13
to Maarten Bynens, scala-user
Hi Maarten,

To me, this doesn't seem viable as this would entail blocking (as a Stream is lazy/on-demand which is not the case for Future).

What problem are you trying to solve?

Cheers,


--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Viktor Klang
Director of Engineering

Twitter: @viktorklang

Maarten Bynens

unread,
Mar 28, 2013, 11:43:55 AM3/28/13
to √iktor Ҡlang, scala-user
Hi Viktor,

Main idea is to implement a trait that doesn't expect futures, but just a collection of results.  A stream is actually not necessary, an iterator e.g. would be ok also. 
Are there better ways to bridge the concurrent with the sequential world besides blocking?

Maarten

√iktor Ҡlang

unread,
Mar 28, 2013, 11:54:41 AM3/28/13
to Maarten Bynens, scala-user
On Thu, Mar 28, 2013 at 4:43 PM, Maarten Bynens <maarten...@up-nxt.com> wrote:
Hi Viktor,

Main idea is to implement a trait that doesn't expect futures, but just a collection of results.  A stream is actually not necessary, an iterator e.g. would be ok also. 
Are there better ways to bridge the concurrent with the sequential world besides blocking?

You want to turn (has or hasn't happened) into (has happened), and the only way to do that is to wait until you know it has happened.
Question is what value is gained by using Futures if you are going to block. May I ask where you get the Futures from?

Cheers,

Maarten Bynens

unread,
Mar 28, 2013, 12:06:57 PM3/28/13
to √iktor Ҡlang, scala-user
On 28 Mar 2013, at 16:54, √iktor Ҡlang wrote:

On Thu, Mar 28, 2013 at 4:43 PM, Maarten Bynens <maarten...@up-nxt.com> wrote:
Hi Viktor,

Main idea is to implement a trait that doesn't expect futures, but just a collection of results.  A stream is actually not necessary, an iterator e.g. would be ok also. 
Are there better ways to bridge the concurrent with the sequential world besides blocking?

You want to turn (has or hasn't happened) into (has happened), and the only way to do that is to wait until you know it has happened.
Question is what value is gained by using Futures if you are going to block. May I ask where you get the Futures from?

I issue multiple requests to a datastore in parallell. Not all results are necessarily needed in the super-trait, so an iterator (or stream) of results is what should be returned. The overall behavior is basically similar to Future.find but split over a trait and a class, where the trait does not expect futures.

Regards,
Maarten

√iktor Ҡlang

unread,
Mar 28, 2013, 12:19:30 PM3/28/13
to Maarten Bynens, scala-user
Alright, then you don't have any other choice than blocking.

Cheers,

Jed Wesley-Smith

unread,
Mar 28, 2013, 9:26:46 PM3/28/13
to √iktor Ҡlang, Maarten Bynens, scala-user
I presume you also want to handle results in the order they arrive?
And have some support for timeouts etc?

We wrote something with a similar purpose for our Java users:

https://bitbucket.org/atlassian/atlassian-util-concurrent/src/master/src/main/java/com/atlassian/util/concurrent/AsyncCompleter.java

It is fortunately much easier to implement in Scala. Unfortunately –
as √iktor points out – there is no way to turn an asynchronous call
into a synchronous one without blocking.

cheers,
jed

Maarten Bynens

unread,
Apr 2, 2013, 7:41:06 AM4/2/13
to Jed Wesley-Smith, √iktor Ҡlang, scala-user
My apologies for the silence (eastern holidays).

On 29 Mar 2013, at 02:26, Jed Wesley-Smith wrote:

> I presume you also want to handle results in the order they arrive?
> And have some support for timeouts etc?
Exactly. Sounds very similar indeed.

With respect to my implementation: if we forget about the blocking and just want to order futures by their time of completion, is this a good way of doing it? For every completed future it starts at the front of a list of promises to find the first non-completed one...

def orderFutures[A](futures: Iterable[Future[A]]): Iterable[A] = {
import scala.concurrent.duration.Duration
val promises = Seq.fill(futures.size)(Promise[A]())
futures.foreach{_ onComplete (addToResults(_,promises))}
def addToResults(result: Try[A], prom: Seq[Promise[A]]): Unit = prom match {
case Seq() => ???
case Seq(promise, rest@_*) =>if(promise tryComplete result) () else addToResults(result, prom.tail)
}
promises
}

Thank you!

Maarten
Reply all
Reply to author
Forward
0 new messages