[2.0.4 - Java] Multiple Async operations

317 views
Skip to first unread message

CB

unread,
Jan 2, 2013, 6:24:15 AM1/2/13
to play-fr...@googlegroups.com
hi all,

I need to perform multiple intensive queries to my back end in parallel and return the result to the client once all the queries comes back.

I do not need to merge te results from each queries - just return each result to the client.

I read the documentation - specifically the Handling asynchronous results article which is very nice BUT - I need an example of how to perform multiple async intensive work load and the article only shows a single intensive call.

Can someone please post a java example of a controller that does multiple async operations? OR - direct me to an article that shows how to do that in Java.

Thanks!!

James Roper

unread,
Jan 2, 2013, 7:50:49 AM1/2/13
to play-framework
I'm not sure what you mean by not wanting to merge the results but wanting to return each result.  You can only send one result, so if you want to return each result, you have to somehow merge them into one result.  Could you explain a bit more here, maybe give an example of what you're trying to do?

Anyway, depending on how you want to merge them, you have two options.  The first step for each is to create all of your promises.  For example:

Promise<Integer> promise1 = Akka.future(new Callable<Integer>() {
  public Integer call() {
    return intensiveComputation1();
  }
});
Promise<Integer> promise2 = Akka.future(new Callable<Integer>() {
  public Integer call() {
    return intensiveComputation2();
  }
});

Then your options are:

1) Use Promise.waitAll:

Promise<List<Integer>> combinedPromise = Promise.waitAll(promise1, promise2);

This will merge all your results into a single promise for a list of the results.  If all your intensive operations return the same type, this might be the right thing for you, and is the simplest way to do it.

2) Use flatMap and map.  This gives you a bit more flexibility with how things are merged, but is a bit cumbersome.  It looks like this:

Promise<Foo> combinedPromise = promise1.flatMap(new Function<Integer, Promise<Foo>() {
  public Promise<Foo> apply(final Integer result1) {
    return promise2.map(new Function<Integer, Foo>() {
      public Foo apply(Integer result2) {
        return new Foo(result1, result2);
      }
    });
  }
});

As you can see, this solution, if you start having many promises, get's very cumbersome very quickly.  If you do have these requirements, then I might suggest that you're not using the best language to solve the problem.  Here would be the equivalent Scala code to the above:

for {
  result1 <- promise1
  result2 <- promise2
} yield new Foo(result1, result2)

Each new promise that you added would simply add one line of code.  Scala is really designed for this sort of stuff.  Java isn't, we make it possible to do it in Play as pain free as possible, but the language really gets in the way.


--
 
 



--
James Roper
Software Engineer

Typesafe - The software stack for applications that scale
Twitter: @jroper

CB

unread,
Jan 2, 2013, 8:07:59 AM1/2/13
to play-fr...@googlegroups.com
thanks James.

What I mean by merge :

Each work is a fetch of X rows from a DB Query

Merge would be (for example) - filtering out duplicated rows or union or intersecting etc.

Currently I do not need merge operation so your first suggestion seems to do the job (for now)

I will try it and let you know if I have difficulties.

Quick Question though - Is there anything special that I need to configure in the Play config file to support X parallel intensive works?

If I understand correctly - the Play Engine is single threaded (event loop) - so I don't understand how would everything run in parallel without me having to define some extra threads...

Thanks!!!

Nilanjan Raychaudhuri

unread,
Jan 2, 2013, 5:56:21 PM1/2/13
to play-fr...@googlegroups.com
Play doesn't have any single threaded event loop. Take a look at the default conf

https://github.com/playframework/Play20/blob/2.0.4/framework/src/play/src/main/resources/reference.conf#L31

As you can see that it will start with Parallelism (threads) ... ceil(available processors * factor) and max is set to 24 threads.

You can always change these settings by overriding them in your application.conf file.

Thanks
Nilanjan, Developer & Consultant
Typesafe Inc.

James Roper

unread,
Jan 2, 2013, 9:00:31 PM1/2/13
to play-framework
Quick Question though - Is there anything special that I need to configure in the Play config file to support X parallel intensive works?

If I understand correctly - the Play Engine is single threaded (event loop) - so I don't understand how would everything run in parallel without me having to define some extra threads...

This assumption is false.  Play uses a thread pool, and by default configures it to have as many threads as you have processors.  You can configure it to have more.  Here's the docs:

CB

unread,
Jan 3, 2013, 7:34:45 AM1/3/13
to play-fr...@googlegroups.com
hi James,

Thank you so much for the help.

So - I think that for my needs going forward with your below option #1 is the right choice since all of my async works return the same type.

What's missing from your below example code are 2 things - 1. what to do with the combinedPromise and 2. how to handle exceptions that could potentially get thrown from within each of the promises?

I'm sorry if this is a straight forward answer but I'm new to the akka framework and not sure how to proceed.

Your help is really appreciated!!


On Wednesday, January 2, 2013 2:50:49 PM UTC+2, James Roper wrote:

James Roper

unread,
Jan 3, 2013, 10:42:28 PM1/3/13
to play-framework
1. what to do with the combinedPromise 

Map it to a result and pass the resulting promise to async().  Eg:

public static Result myAction() {
  // Do something here to create promises
  ...

  Promise<List<Integer>> combinedPromise = Promise.waitAll(promise1, promise2);
  return async(combinedPromise.map(new Function<List<Integer>, Result> {
    public Result apply(List<Integer> results) {
      // What do you want to do with your combined results?  Let's serialise them out as JSON
      return Ok(Json.toJson(results));
    }
  });
}  
 
and 2. how to handle exceptions that could potentially get thrown from within each of the promises?

Well, how do you want to handle the exceptions?  Do you want to ignore them?  Do you want to send an error if just one of them fails?  You gotta decide that first.

So first things first, handle the exceptions.  You have two ways of doing that:

1) Don't throw exceptions.  Wrap your computational expensive code in a try catch so it doesn't throw any exceptions, and always returns a result.
2) Use Promise.recover to map any exceptions that were encountered to a valid result.

Now if you just want to ignore the exceptions, then you're now done, because when an exception is thrown you just have to return some sort of default result (an empty list, null, whatever).  However, if you want to do something if one of the computations failed, then you need to include the failure in the result, and then when you do your map function above, check each result to see if it was a failure.  If there was a failure, return an error, otherwise return the results.

Typically in functional programming, we use a type called Either to represent this result.  Either basically represents a single value that can either be one type, or another, but not both.  We call one type the left type, and the other type the right type, and by convention, if one of the types represents an error, it's the left type.

So let's say we want to represent an error using a String (you could use more complex types if you want), then our initial promises might look like this:

Promise<Either<String, Integer>> promise1 = Akka.future(new Callable<Either<String, Integer>>() {
  public Either<String, Integer> call() {
    try {
      return Either.Right(intensiveComputation1());
    } catch (Exception e) {
      return Either.Left(e.getMessage());
    }
  }
});
 
So in our map function we'd do something like this:

combinedPromises.map(new Function<List<Either<String, Integer>>, Result> {
  public Result apply(List<Either<String, Integer>> results) {
    List<Integer> successResults = new ArrayList<Integer>();
    for (Either<String, Integer> result: results) {
      if (result.left.isDefined()) {
        return internalServerError(result.left.get());
      }
      successResults.add(result.right.get());
    }
    return Ok(Json.toJson(successResults));
  }
}

And there you have it.  At this point, Java's verbosity makes things really not that nice.  The above code could be replaced with this Scala code:

combinedPromises.map { results => 
  results.collectFirst { 
    case Left(msg) => InternalServerError(msg)
  }.getOrElse(Ok(Json.toJson(results.map(_.right.get))))
}

Which makes it much easier to see the actual logic of your application, instead of having to ignore so much Java boiler plate.  If you're going to be doing a lot of this stuff, I'd recommend using Scala.

CB

unread,
Jan 6, 2013, 9:25:04 AM1/6/13
to play-fr...@googlegroups.com
James,

Thanks you for the below - I followed your instructions and it works for me. As a side note - the below can be very helpful for others and could be easily added to the wiki / docs.

I am now facing a strange behaviour where sometimes the parallel processing runs just fine and sometimes fails (usually when consecutive calls are performed)

I cannot debug the error - my breakpoints will not get activated.

The only thing I see on the console is:

error] play - Waiting for a promise, but got an error: For input string: ".101E.2101E2"
java.lang.NumberFormatException: For input string: ".101E.2101E2"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1222) ~[na:1.6.0_37]
at java.lang.Double.parseDouble(Double.java:510) ~[na:1.6.0_37]
at java.text.DigitList.getDouble(DigitList.java:151) ~[na:1.6.0_37]
at java.text.DecimalFormat.parse(DecimalFormat.java:1302) ~[na:1.6.0_37]
at java.text.SimpleDateFormat.subParse(SimpleDateFormat.java:1589) ~[na:1.6.0_37]
at java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1311) ~[na:1.6.0_37]


I'm not sure where this is coming from.

Is there a minimum timeout for each worker thread? can it be increase? maybe I should also handle timeouts?

thanks!!!

CB

James Roper

unread,
Jan 6, 2013, 8:28:34 PM1/6/13
to play-framework
There is no timeout for the worker thread, but Play provides some nice features for implementing a timeout yourself.  There is a Promise.timeout static method for creating a promise that is redeemed after a certain time period, and then there is a method called "or" on promise, and you can combine this with the expensive computation promise.  It returns an Either, which will be redeemed with the result of whichever promise completed first.  So if the timeout is reached first, then the either will be the timeout result, and you can generate an appropriate error message from that.

As for the error above, check the logs/application.log file, this will have the full stack trace of the exception in it.   Some of your code somewhere is trying to parse a date from a string that is not a valid date.

CB

unread,
Jan 7, 2013, 7:00:32 AM1/7/13
to play-fr...@googlegroups.com
thanks James. I reviewed the application log per your suggestion and immediately found the problem. Works like a charm :)

2 followup questions -

1. what happens if my code is trying to dispatch async workers that exceed the max allowed threads? how will the system handle this?

2. Can you please enhance your below example (the one with the exception handling) to include support for predefined timeouts ?

again James - thanks for all the info you provide :)

CB

James Roper

unread,
Jan 7, 2013, 8:37:21 PM1/7/13
to play-framework
1. what happens if my code is trying to dispatch async workers that exceed the max allowed threads? how will the system handle this?

Everything just gets queued in the execution context.  You can configure the number of threads as outlined here:


James Ward wrote a good writeup about tuning it here:

 
2. Can you please enhance your below example (the one with the exception handling) to include support for predefined timeouts ?

Perhaps you might be interested in a Typesafe subscription:


I'm giving you a lot of free help here ;)

Promise<Either<String, Integer>> promise1 = ...
// Or it with a timeout promise of 10 seconds
Promise<Either<String, Either<String, Integer>>> withTimeout1 = Promise.timeout("Timed out", 10000).or(promise1);
// Flatten to combine the two errors
promise1 = withTimeout1.map(new Function<Either<String, Either<String, Integer>>>, Either<String, Integer>() {
  public Either<String, Integer> apply(Either<String, Either<String, Integer>> result) {
    if (result.left.isDefined()) {
      return Either.Left<String, Integer>(result.left.get());
    } else {
      return result.right.get();
Reply all
Reply to author
Forward
0 new messages