Proposal: New function Task.await_many

58 views
Skip to first unread message

Ian Young

unread,
Jan 13, 2020, 1:40:21 PM1/13/20
to elixir-l...@googlegroups.com

Background

The Task module currently contains three functions that synchronously retrieve results from asynchronous tasks:

  • Task.await: Blocks waiting for a reply from a single task. Accepts a timeout value. If the task is successful, returns reply. If the task dies or the timeout is reached, exits with the corresponding reason.
  • Task.yield: Blocks waiting for a reply from a single task. Accepts a timeout value. If the task is successful, returns {:ok, reply}. If the task dies or the timeout is reached, returns {:exit, reason} or nil.
  • Task.yield_many: Blocks waiting for replies from a list of tasks. Accepts a timeout value. When all tasks are complete or the timeout is reached, returns a list of result tuples: {:ok, reply} for successful tasks, {:exit, reason} for dead tasks, and nil for timed-out.

Additionally, the Task module contains one function that handles both creating asynchronous tasks and retrieving the results:

  • Task.async_stream: Asynchronously applies a given function to each element in a given enumerable. Accepts a timeout value that is applied to each task separately. Returns an enumerable that emits results, blocking as needed. If a task dies, exits with the reason. When tasks complete, emits {:ok, reply}. When tasks reach the timeout, either exits or emits {:exit, :timeout}, depending on configuration options.

The discussion that eventually became Task.async_stream included an alternative suggestion of Task.async_many and Task.await_many. In the end, async_stream was chosen because it provides the ability to bound the maximum concurrency and stream results, making it the most robust way to handle intensive processing over an enumerable.

Proposal

I propose this addition to retrieve results from multiple asynchronous tasks while adhering to await behavior:

Task.await_many(tasks, timeout \\ 5000)

Blocks waiting for replies from a list of tasks. If the tasks complete, returns a list of replies. If the timeout is reached, exits with :timeout. If a task dies, exits with the reason given by the task.

Task.await (together with Task.async) provides a simple solution that can be used as a drop-in replacement for synchronous code. The addition of Task.await_many will provide the building blocks for many common asynchronous flows, with unsurprising default behavior. There are other ways to accomplish the same thing, but Task.await_many provides a similar value proposition to Task.await: it is simple to use, is the right amount of tooling for simple use cases, and doesn't require extra code or reasoning about complicated async workflow concerns.

It fits well with the existing feature set, since it essentially fills a gap in the collection of related functions (yield_many is to yield as await_many is to await). It should be very easy for people to use and understand, provided they are familiar with the other Task functions.

As a toy example, consider baking a cake, a construction of heterogeneous sometimes-parallelizable tasks:

oven_prep = Task.async(fn -> preheat_oven(350) end),

{pan, bowl} = wash_dishes()

frosting_prep = Task.async(fn -> make_frosting(bowl, :pink) end)

[_, greased_pan, batter] = Task.await_many([
  oven_prep,
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
], 600_000)

baking = Task.async(fn ->
  baked_cake = bake(batter, greased_pan, 30)
  cool(baked_cake, 10)
end)

eat_dinner()

[cooled_cake, frosting] = Task.await_many([baking, frosting_prep])

cooled_cake
|> frost(frosting)
|> eat()

Alternatives

Why not Task.await?

A common pattern suggested online [1][2][3] is to enumerate calls to Task.await:

Enum.map(tasks, &Task.await(&1, timeout))

Because the await calls happen sequentially, the timeout is reset for each element of the list. This can lead to unexpected and likely unwanted behavior in which this call may block much longer than the specified timeout.

Why not Task.yield_many?

Task.yield_many works fine for this situation, but it adheres to the semantics of Task.yield rather than Task.await. It returns a tuple instead of the bare reply, and on failure it does not exit or kill still-running tasks. To achieve the behavior of await, you must write something like this to handle the various possible results:

Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      # Maybe unnecessary since we are exiting?
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
end)

Rather than expecting every developer to write this boilerplate (and not make any mistakes in doing so), I think it would be better to provide a construct in the standard library.

Why not Task.async_stream?

Task.async_stream is great for enumerating the same expensive operation across a list of inputs, and it absolutely should be used for that. However, it is not well-suited to situations where the collection of tasks is less uniform. Consider the cake example:

prep = [
  Task.async(fn -> preheat_oven(350) end),
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
]

This would be a very awkward fit for async_stream. It is a specialized tool that should not be applied in a generalized way. In addition, async_stream has its own return values and exit behavior that does not match that of await.

One potential harm of adding Task.await_many is that people might be tempted to use it when they would be better off using Task.async_stream. I believe this can be mitigated with proper documentation.

Why not GenStage?

GenStage  provides powerful and flexible abstractions for handling asynchronous flows. For applications that have complicated needs, this is a great tool. Often, though, we have much simpler needs and applying GenStage to the problem would be massive overkill.

Task.await is easy to use and easy to reason about. The goal of Task.await_many is the same. It’s okay if it doesn’t cover every possible use case, as long as it covers the ones we most commonly encounter in a way that doesn’t encourage us to make mistakes.


José Valim

unread,
Jan 13, 2020, 2:14:02 PM1/13/20
to elixir-l...@googlegroups.com
Hi Ian, thanks for the proposal.

Quick question, what is the issue with the approach below:
prep = [
  fn -> preheat_oven(350) end,
  fn -> grease_pan(pan) end,
  fn -> mix_batter() end,
]
[{:ok, oven}, {:ok, pan}, {:ok, mix}] =
prep |> Task.async_stream(& &1.()) |> Enum.to_list()
Also, can you provide a concrete example of where async_many is necessary? It will help guide the discussion.


José Valim
Founder and Director of R&D


--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/16e0e909-6ce5-44e6-9a96-738dfc352015%40www.fastmail.com.

Ian Young

unread,
Jan 13, 2020, 3:13:50 PM1/13/20
to elixir-l...@googlegroups.com
Sure! Will send two messages for ease of reading.

That approach was suggested on the forum topic as well, and I answered in a bit more detail there. In short, I think that approach is a bit unintuitive, since async_stream is designed around mapping transformations onto homogenous lists. If I had to reinvent this myself as a non-expert, there's room for me to make errors, and there are edge case behaviors that are very hard for me to accurately predict.

Perhaps the most practical argument against that solution is that no one seems to use it. All the top results I found when searching for this problem ([1][2][3]) are people suggesting the buggy solution of mapping to `Task.await`. If people are gravitating towards an incorrect solution to a common problem, it's a good sign that the language could serve them better.

Ian Young

unread,
Jan 13, 2020, 3:16:18 PM1/13/20
to elixir-l...@googlegroups.com
The task that led me down this path in the first place is performing a bunch of slow queries to populate database tables. A few of the tables depend on others and thus need to wait until their sources have been populated, while others are free to be run fully parallelized.

Imagine this setup:

Tables A, B: no deps
Tables C, D: depend on A, B
Table E: no deps

Here's how I would like to write it:

e_task = Task.async(fn -> populate("E") end)

[a, b] = Task.await_many([
  Task.async(fn -> populate("A") end),
  Task.async(fn -> populate("B") end)
], 10_000)

[c, d, e] = Task.await_many([
  Task.async(fn -> populate("C", a, b) end),
  Task.async(fn -> populate("D", a, b) end),
  e_task
], 10_000)

do_stuff(a, b, c, d, e)
  

On Mon, Jan 13, 2020, at 1:13 PM, José Valim wrote:

José Valim

unread,
Jan 13, 2020, 3:22:08 PM1/13/20
to elixir-l...@googlegroups.com
Sounds good to me! Please do send a PR!
--

Ian Young

unread,
Jan 16, 2020, 12:35:16 AM1/16/20
to elixir-l...@googlegroups.com
--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages