Future and nondeterministic dataflow

157 views
Skip to first unread message

Andrew Gaydenko

unread,
May 20, 2013, 5:02:31 PM5/20/13
to scala...@googlegroups.com
From Future#isCompleted scaladoc:

   Note: using this method yields nondeterministic dataflow programs.

What does it exactly - in terms of a code  - mean?

√iktor Ҡlang

unread,
May 20, 2013, 5:15:40 PM5/20/13
to Andrew Gaydenko, scala-user
Hi Andrew,

import scala.concurrent.{ Future, ExecutionContext }
import ExecutionContext.Implicits.global
val f = Future(1)
if (f.isCompleted) launchTorpedoes() else lobsterForEverybody()


Is the program above deterministic?

Cheers,



--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Viktor Klang
Director of Engineering

Twitter: @viktorklang

Andrew Gaydenko

unread,
May 20, 2013, 5:56:56 PM5/20/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 1:15:40 AM UTC+4, √iktor Klang wrote:
Is the program above deterministic?

No, it isn't, thanks!

OK, I'll elaborate my use case. I have a function f returning Future. In f implementation at some case I complete returning result with Future.failed(myException). In another code point I check:

        val newState = f(...)
        if (newState.isCompleted) {
          Await.result(newState, 10.milliseconds) match { // completed future - any duration is suitable
            case ex: SomeThrowable => the way on error
            case _                 => let it continue
          }
        }
        else let it continue

The only thing I"d want to be sure - don't miss that my explicit completion. I'm not interested in other possible errors here. OTOH, "the way on error" suits for any other (rather explicit failing) error cases also would last ones occur.

So, with all these circumstances, is isCompleted use correct?

Som Snytt

unread,
May 20, 2013, 6:09:58 PM5/20/13
to √iktor Ҡlang, Andrew Gaydenko, scala-user
I think that after you launch the torpedoes, it blocks while everyone waits on deck for the lobsters to start floating to the surface.

But if their work wasn't completed on time, no torpedo is launched, and they also wait on deck for the lobster dinner they were promised.

Sadly, they sail the seas as a ghost ship forever awaiting lobsters that will never come.

That is, unless you can find the Promise that will release them.  Can you complete the promise before the roving crew of ghost lobstermen devastate the coast of Maine?

Roland Kuhn

unread,
May 21, 2013, 2:31:49 AM5/21/13
to Andrew Gaydenko, scala...@googlegroups.com
Hi Andrew,

if I decipher your intent correctly you want to distinguish between those failures which are immediate and the rest (what I mean is that the Future can still fail later). In order to do so you must use a different means of communication because your code does not (and cannot) reliably achieve this goal. If f() failed immediately, then it should not wrap that failure in a Future; it should either just throw it or return a Try[Future[...]] instead.

Another comment: Await.result() does not give you the failure as a value, it will throw it, so your match statement will never be executed.

Regards,

Roland

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Empowering professional developers to build amazing apps.
twitter: @rolandkuhn

See you at Scala Days 2013 in NYC!
June 10th - June 12th
www.scaladays.org

Andrew Gaydenko

unread,
May 21, 2013, 6:21:14 AM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
Roland, thanks! - again you have caught my intention :) Thanks for the clarification, will try to wrap a Future with something appropriate.
Indeed that matching case is silly as far as it doesn't fit to 'result' signature.

√iktor Ҡlang

unread,
May 21, 2013, 6:47:32 AM5/21/13
to Andrew Gaydenko, scala-user
The rule is: Only ever use Await.<X> if you _must_ go from Future[T] => T.
99% of the times otherwise there is a cheaper, more performant and less buggy solution using the combinators.

Cheers,


On Tue, May 21, 2013 at 12:21 PM, Andrew Gaydenko <andrew....@gmail.com> wrote:
Roland, thanks! - again you have caught my intention :) Thanks for the clarification, will try to wrap a Future with something appropriate.
Indeed that matching case is silly as far as it doesn't fit to 'result' signature.

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrew Gaydenko

unread,
May 21, 2013, 7:08:48 AM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 2:47:32 PM UTC+4, √iktor Klang wrote:
The rule is: Only ever use Await.<X> if you _must_ go from Future[T] => T.
99% of the times otherwise there is a cheaper, more performant and less buggy solution using the combinators.

Viktor, yes, this is a reason of the post.
With Future-wrapping we loose composition - this is the problem! :)

√iktor Ҡlang

unread,
May 21, 2013, 7:13:08 AM5/21/13
to Andrew Gaydenko, scala-user
Please share an example of that loss?
 

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrew Gaydenko

unread,
May 21, 2013, 7:50:40 AM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 3:13:08 PM UTC+4, √iktor Klang wrote:
Please share an example of that loss?

The aim is to write async part parser in Play!/multipart request context. Initial code is:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext

import play.api.libs.iteratee. { Iteratee, Input, Cont, Done, Error }
import play.api.mvc.BodyParser
import play.api.mvc.BodyParsers.parse
import play.api.mvc.BodyParsers.parse.Multipart. { FileInfo, PartHandler }
import play.api.mvc.MultipartFormData.FilePart

object PartParser {
 
  def apply[A](init: FileInfo => Future[A], w: (A, Array[Byte]) => Future[A])(implicit ec: ExecutionContext) = BodyParser { request =>
    def handler: PartHandler[FilePart[Future[A]]] = parse.Multipart.handleFilePart {
      case fInfo : FileInfo => fold[Array[Byte], A](init(fInfo)) { (fuState, bytes) => fuState.flatMap { w(_, bytes) } }
    }
    parse.multipartFormData(handler).apply(request)
  }

  private def fold[E, A](initA: Future[A])(f: (Future[A], E) => Future[A]): Iteratee[E, Future[A]] = {
    def step(s: Future[A])(in: Input[E]): Iteratee[E, Future[A]] = in match {
      case Input.EOF   => Done(s, Input.EOF)
      case Input.Empty => Cont[E, Future[A]](i => step(s)(i))
      case Input.El(e) =>
        Cont[E, Future[A]](newIn => step(f(s, e))(newIn)) // on Ok in w function
        // Error(err, Input.EOF) // must be used on error in w function
    }
    (Cont[E, Future[A]](in => step(initA)(in)))
  }
}

You see, to use the parser we need to define init() and w() functions. The last one can result in error. This error must be intercepted in fold() function, Input.El(e) case.
Of course we have a freedom how to exptess an error in w(), but ok-case can be expressed with Future[A] only with possible furher wrapping. The obvious way is to use Either[String, Future[A]]. But I can not find a way how to modify apply() and fold() at this case.

√iktor Ҡlang

unread,
May 21, 2013, 8:00:50 AM5/21/13
to Andrew Gaydenko, scala-user
What does the iteratee do when the Future is failed?

Cheers,


--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrew Gaydenko

unread,
May 21, 2013, 8:19:12 AM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 4:00:50 PM UTC+4, √iktor Klang wrote:
What does the iteratee do when the Future is failed?

I don't know exaclty :) as far as don't know play internal mechanics. On exception in w() play just downloads all the file (be at any size) and then shows an error. At any case I must follow iteratee contract: on error a consumer must return Error(err, Input.EOF), and this is the aim. If at this case more or less immediate interruption will not take place, I'll ask James Roper about iteratee contract details :)

Andrew Gaydenko

unread,
May 21, 2013, 1:29:14 PM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
Say, this 'apply' implementation doesn't result in Failure case on throwing exception in w():

  def apply[A](init: FileInfo => Future[A], w: (A, Array[Byte]) => Future[A])(implicit ec: ExecutionContext) = BodyParser { request =>
    def handler: PartHandler[FilePart[Future[A]]] = parse.Multipart.handleFilePart {
      case fInfo : FileInfo => {
        def step(fuA: Future[A])(in: Input[Array[Byte]]): Iteratee[Array[Byte], Future[A]] = in match {
          case Input.EOF   => Done(fuA, Input.EOF)
          case Input.Empty => Cont[Array[Byte], Future[A]](i => step(fuA)(i))
          case Input.El(e) => Try { fuA.flatMap( w(_, e)) } match {
            case Success(fu)  => Cont[Array[Byte], Future[A]](newIn => step(fu)(newIn))
            case Failure(err) => Error(err.getMessage, Input.EOF)
          }
        }
        (Cont[Array[Byte], Future[A]](in => step(init(fInfo))(in)))
      }
    }
    parse.multipartFormData(handler).apply(request)
  }

So, if we wrap Future, we can not flatMap, and if we throw exception, we can't catch it in needed point.

√iktor Ҡlang

unread,
May 21, 2013, 1:34:57 PM5/21/13
to Andrew Gaydenko, scala-user
Try { fuA.flatMap( w(_, e)) } match {
            case Success(fu)  => Cont[Array[Byte], Future[A]](newIn => step(fu)(newIn))
            case Failure(err) => Error(err.getMessage, Input.EOF)
          }

Could you explain this piece of code?

Cheers,



--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrew Gaydenko

unread,
May 21, 2013, 1:58:00 PM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 9:34:57 PM UTC+4, √iktor Klang wrote:
Try { fuA.flatMap( w(_, e)) } match {
            case Success(fu)  => Cont[Array[Byte], Future[A]](newIn => step(fu)(newIn))
            case Failure(err) => Error(err.getMessage, Input.EOF)
          }

Could you explain this piece of code?

w() is that consuming function which takes a current state (the first parameter of type A) and the next chunk of bytes (the second parameter) and calculates next expected state in async manner returning next state in Future[A]. At some error I rise exception in w() implementation and want to return to feeder an error (iteratee builder Error()) instead of a law how to execute iteration step (iteratee builder Cont). Last one takes Future[A] as argument.

OTOH w() takes A, but we have Future[A] in hand (previous state). So, to call w() we use flatMap. But with flatMap we can not catch the exception thrown in w() with Try.

√iktor Ҡlang

unread,
May 21, 2013, 2:01:57 PM5/21/13
to Andrew Gaydenko, scala-user
On Tue, May 21, 2013 at 7:58 PM, Andrew Gaydenko <andrew....@gmail.com> wrote:
On Tuesday, May 21, 2013 9:34:57 PM UTC+4, √iktor Klang wrote:
Try { fuA.flatMap( w(_, e)) } match {
            case Success(fu)  => Cont[Array[Byte], Future[A]](newIn => step(fu)(newIn))

When is ``w(_, e)`` executed and on which thread?
 

            case Failure(err) => Error(err.getMessage, Input.EOF)

When can this branch be taken?
 

          }

Could you explain this piece of code?

w() is that consuming function which takes a current state (the first parameter of type A) and the next chunk of bytes (the second parameter) and calculates next expected state in async manner returning next state in Future[A]. At some error I rise exception in w() implementation and want to return to feeder an error (iteratee builder Error()) instead of a law how to execute iteration step (iteratee builder Cont). Last one takes Future[A] as argument.

OTOH w() takes A, but we have Future[A] in hand (previous state). So, to call w() we use flatMap. But with flatMap we can not catch the exception thrown in w() with Try.

Exactly, so the Failure branch will never be taken, since it will always be a ``Success(fu)``

Cheers,
 

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Andrew Gaydenko

unread,
May 21, 2013, 2:22:40 PM5/21/13
to scala...@googlegroups.com, Andrew Gaydenko
On Tuesday, May 21, 2013 10:01:57 PM UTC+4, √iktor Klang wrote:
Try { fuA.flatMap( w(_, e)) } match {
            case Success(fu)  => Cont[Array[Byte], Future[A]](newIn => step(fu)(newIn))

When is ``w(_, e)`` executed and on which thread?
 

            case Failure(err) => Error(err.getMessage, Input.EOF)

When can this branch be taken?

Unfortunately, I can not answer the questions as all these details are matter of Play implementation. From Play API client's perspective Play just supply default ExecutionContext, having last one in second parameters list as implicit parameter allover Play API. OTOH, I guess, API client must not depend on implementation.


Exactly, so the Failure branch will never be taken, since it will always be a ``Success(fu)``

So, we must wrap Future into some container similar to Either. It is trivial to reimplement w() to return anything we want, but I haven't found a way to supply next state (fu) to Cont.

Andrew Gaydenko

unread,
May 22, 2013, 9:19:57 AM5/22/13
to scala...@googlegroups.com
OK, as far as current Future and Iteratees APIs don't permit to resolve error catching with Future wrapping, I have tried this way to catch an error (this is a modified code of the first one shown in this topic):

  def catchFailedFuture[A,R](fu: Future[A], use: Future[A] => R, onError: Throwable => R)(implicit ec: ExecutionContext): R =
    if (fu.isCompleted) Try { Await.result(fu, 10.milliseconds) } match {
      case Success(a)  => use(Future(a))
      case Failure(er) => onError(er)
    }
    else use(fu)

I have tried to use it, and Failure branch is selected on Future.failed(ex). At not-failed case in my test almost all executions were in last string, and some of them in Success() branch.
As for the application, - acync MPR parser for Play, - Play happily get Error() iteratee and almost immediately closes a connection to malicious client :)
If this variant of code is still broken, please let me know why.

James Roper

unread,
May 27, 2013, 10:52:01 PM5/27/13
to scala...@googlegroups.com
Hi Andrew,

Firstly, there should be no need to ever have Iteratee[Array[Byte], Future[_]], since an iteratee is is a kind of future.  If you have a function, w: (Array[Byte], A) => Future[A], then use it like this:

Iteratee.flatten(w(e, a).map(a => Cont(step(a))))

Then you will end up with Iteratee[Array[Byte], A].  So then you can just use Future.recover to ensure the iteratee reaches the error state:

Iteratee.flatten(w(e, a).map(a => Cont(step(a))).recover {
  case NonFatal(t) => Error(t.getMessage, Input.El(e))
})

But, I'm not sure that this is even necessary, eventually, an error iteratee becomes an IOException that is held by the eventual future that you get back after applying an enumerator. So, you shouldn't need to do the recover.

But anyway, here's what your code might look like:

   def apply[A](init: FileInfo => Future[A], w: (A, Array[Byte]) => Future[A])(implicit ec: ExecutionContext) = BodyParser { request =>
    def handler: PartHandler[FilePart[A]] = parse.Multipart.handleFilePart {
      case fInfo : FileInfo => {
        def step(fuA: A)(in: Input[Array[Byte]]): Iteratee[Array[Byte], A] = in match {
          case in @ Input.EOF   => Done(fuA, in)
          case Input.Empty => Cont(step(fuA))
          case in @ Input.El(e) => Iteratee.flatten(w(a, e).map(a => Cont(step(a))).recover {
            case NonFatal(t) => Error(err.getMessage, in)
          })
        }
        Iteratee.flatten(init(fInfo).map(a => Cont(step(a))))
      } 
    }
    parse.multipartFormData(handler).apply(request)
  }

James Roper

unread,
May 27, 2013, 10:55:14 PM5/27/13
to scala...@googlegroups.com
Also, the iteratee that I've written there basically does exactly the same thing is Iteratee.fold1, so the whole implementation could be replaced with:

  def apply[A](init: FileInfo => Future[A], w: (A, Array[Byte]) => Future[A])(implicit ec: ExecutionContext) = BodyParser { request =>
    def handler: PartHandler[FilePart[A]] = parse.Multipart.handleFilePart {
      case fInfo : FileInfo => {
        Iteratee.fold1(init(fInfo))(w)
      } 
    }
    parse.multipartFormData(handler).apply(request)
  }
Message has been deleted

Andrew Gaydenko

unread,
May 29, 2013, 9:08:16 AM5/29/13
to scala...@googlegroups.com
James, heartily thanks! Have tried your suggestion with flatten and recover. Both work as expected, malicious uploading is interrupting almost immediately. fold1 also works at legal mainstream case, but doesn't suit for immediate interrupting.


√iktor Ҡlang

unread,
May 29, 2013, 9:20:09 AM5/29/13
to Andrew Gaydenko, scala-user
Excellent, thanks for clarifying James!

Cheers,


On Wed, May 29, 2013 at 3:08 PM, Andrew Gaydenko <andrew....@gmail.com> wrote:
James, heartily thanks! Have tried your suggestion with flatten and recover. Both work as expected, malicious uploading is interrupting almost immediately. fold1 also works at legal mainstream case, but doesn't suit for immediate interrupting.

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Reply all
Reply to author
Forward
0 new messages