[Proposal] exit_task in Task.async_stream to expose the element it timed out on

142 views
Skip to first unread message

Juan Peri

unread,
Feb 18, 2022, 4:38:13 AM2/18/22
to elixir-l...@googlegroups.com
In these past few year I've found myself needing to execute several async tasks (mostly around getting remote resources) in scripts, and wanting to do something in the ones that fail
With the current implementation of async_stream, there is no way to know which ones failed, as the output would look like the following:
```
[1, 2, 3, 4]
|> Task.async_stream(
  fn entry ->
    if entry == 3, do: :timer.sleep(2000)
    entry * entry
  end,
  timeout: 1000,
  on_timeout: :exit_task
)
|> Enum.to_list()
[ok: 1, ok: 4, exit: :timeout, ok: 16]
```
That would force me to get creative with stream Zip, or hand roll my own solution.

@vtm in the elixir slack pointed me to the commit https://github.com/elixir-lang/elixir/commit/c94327cc4 in wich such functionality was removed because of backwards compatibility.

Would you accept a PR about exposing this functionality again, but behind a opt-in new parameter that is false by default in order to keep backwards compatibility.

Using the previous example, it would look something like:
1, 2, 3, 4]
|> Task.async_stream(
  ...,
  timeout: 1000,
  on_timeout: :exit_task_with_value,
)
|> Enum.to_list()
[ok: 1, ok: 4, {:exit :timeout, 3}, ok: 16]
```
I'm not sold in the flag :exit_task_with_value, it could be a separate parameter as well, as long as it's opt_in

Thanks
______________________________________________________________
Juan - 
No todo el oro reluce.....Ni todo errante anda perdido
¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯

José Valim

unread,
Feb 18, 2022, 4:48:12 AM2/18/22
to elixir-lang-core
Hi Juan,

In your example a simple Enum.zip would suffice:

list = [1, 2, 3, 4]

list

|> Task.async_stream(
  fn entry ->
    if entry == 3, do: :timer.sleep(2000)
    entry * entry
  end,
  timeout: 1000,
  on_timeout: :exit_task
)
|> Enum.zip(list)

Can you provide an example where zipping would not be possible or too cumbersome?

--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/CAPx9ufN-kdNYuAJri_z9pxQ-0yopvSdNUVBLpZ1G6fTHPk1BDw%40mail.gmail.com.

vtm

unread,
Feb 18, 2022, 5:03:58 AM2/18/22
to elixir-l...@googlegroups.com
Hi Jose and Juan. 
I think there is a problem that you should have `order: true` to have a correct result in Enum.zip
Also you might wanna have a big list and just use  `big_list |> Task.async_stream(params) |> Stream.filter(stream, &is_error/1) |> Stream.map(&handle_error/1)  |> Stream.run`
just to capture error with values and do some work


пт, 18 февр. 2022 г. в 12:48, José Valim <jose....@dashbit.co>:
You received this message because you are subscribed to a topic in the Google Groups "elixir-lang-core" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/elixir-lang-core/XCQIKQc3KxM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to elixir-lang-co...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4Li9mcnQ2mqN4xKb0qW9AX5uFi%2BXD77BDrqR-%3D0gW_8iA%40mail.gmail.com.

José Valim

unread,
Feb 18, 2022, 5:08:25 AM2/18/22
to elixir-lang-core
Maybe we should have a zip_inputs: true or similar flag and we attach the input to all "ok" and "exit" tuples.

eternop...@gmail.com

unread,
Feb 18, 2022, 5:20:14 AM2/18/22
to elixir-lang-core
something like `ip_inputs:true` might work.
I usually update a container in the task, and return it, so it contains the input + the output.
But if we had something like zip_inputs, then the task itself could just return the calculated value, and it's up to the pipeline what to do with them. I actually like it.
What would the output look like?
[{:ok, _input_, output}]
[{:error, _input_, :timeout}]
or something along the lines of
[{_input_, {:ok, output}]
[{_input_, {:error, :timeout}]

vtm

unread,
Feb 18, 2022, 5:30:42 AM2/18/22
to elixir-l...@googlegroups.com
yeah zip_inputs sounds awesome.
if you agree I can create a PR

пт, 18 февр. 2022 г. в 13:20, eternop...@gmail.com <eternop...@gmail.com>:

vtm

unread,
Feb 21, 2022, 2:40:16 AM2/21/22
to elixir-l...@googlegroups.com
Hi, i really want to add this zip_inputs method.
what do you think?
if yes, I'll do the PR, but what kind of zip do you want?
```
[{:ok, _input_, output}]
[{:error, _input_, :timeout}]
or something along the lines of
[{_input_, {:ok, output}]
[{_input_, {:error, :timeout}]

пт, 18 февр. 2022 г. в 13:08, José Valim <jose....@dashbit.co>:

José Valim

unread,
Feb 21, 2022, 2:54:34 AM2/21/22
to elixir-l...@googlegroups.com
{:ok, {input, output}} and so on. A PR is welcome!

Juan Peri

unread,
Feb 21, 2022, 4:24:24 AM2/21/22
to elixir-l...@googlegroups.com
hey Jose, what would be the output if there is a timeout? Something like {:error, {input, :timeout}} ?

Wouldn't something like
{input, {:ok, output}
or
{input, {:error, :timeout}}

be more inline with zipping? It would conceptually be something like

inputs = [1, 2]
work = fn x -> {:ok, x * x} end
Enum.zip(inputs, Enum.map(inputs, work))

[{1, {:ok, 1}}, {2, {:ok, 4}}]

The thing that does not convince me in this shape is that the :ok or :error are not in the first position, but if we are explicitly saying that we want to zip the results maybe it's expected?
______________________________________________________________
Juan - 
No todo el oro reluce.....Ni todo errante anda perdido
¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯

José Valim

unread,
Feb 21, 2022, 4:32:23 AM2/21/22
to elixir-lang-core
Yes, it would be {:exit, {input, :timeout}}.

Generally speaking, options should not drastically change the output type of a function. Moving the ok/error tuples inside would be a drastic change. But perhaps, we should only say: "zip_input_with_exit_reason", and we only change exit tuples. That will be clearer and not affect the return types of ok tuples

eternop...@gmail.com

unread,
Feb 22, 2022, 3:03:28 AM2/22/22
to elixir-lang-core
If that's the case (and I agree with you about changing the shape of the data), I would rather have the input zipped with both :ok and :exit tuples, for the sake of consistency
{:ok, {input, output}}
{:exit, {input, :timeout}}

is fine with me :)

Thank you for the discussion!

José Valim

unread,
Feb 22, 2022, 4:18:11 AM2/22/22
to elixir-lang-core
After some thought, only the exits is better because you have control over the ok results anyway. So you can add them if necessary. And the exit is always a reason, so you have no control over it, especially on certain error cases.

vtm

unread,
Feb 26, 2022, 10:06:20 AM2/26/22
to elixir-lang-core
Hey!
Waiting for your comments.
Thx
вторник, 22 февраля 2022 г. в 12:18:11 UTC+3, José Valim:
Reply all
Reply to author
Forward
0 new messages