Side-effects on Stream Exhaustion

28 views
Skip to first unread message

Jay Rogov

unread,
Nov 4, 2022, 12:05:07 PM11/4/22
to elixir-lang-core
A little bit of context.

The way I see it, there're 3 possible situations of dealing with sequence of elements in the code:
1. Sequences are considerably small (millions of elements elements): use lists
2. Neverending stream of events (e.g. queue processing): use streams
3. Something in between, i.e. sequence is finite, but might be big enough to care and avoid constructing lists in order to save memory: use streams

Now consider you'd want to do something at the end of a sequence, as in "after last element was processed" , returning the processed sequence itself.

For lists it's simple: call Kernel.tap/2, ignore the argument and do your dirty side effect.

For streams, not so: one obvious solution would be to wrap a stream inside a Stream.transform/4, like this:
```elixir
> side_effect = fn -> IO.puts("I've processed all elements!") end
>
> 1..10
> |> Stream.transform(fn -> nil end, fn elem, _ -> {[elem], nil} end, fn _ -> side_effect.() end)
> |> Enum.to_list()

I've processed all elements!
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
```

The problem though, is that for Stream, stream exhaustion (emission of all elements in it) and stream halt is the same thing.

So if you're to put something like Enum.take/2 at the end...
```elixir
> side_effect = fn -> IO.puts("I've processed all elements!") end
>
> 1..10
> |> Stream.transform(fn -> nil end, fn elem, _ -> {[elem], nil} end, fn _ -> side_effect.() end)
> |> Enum.take(5)

I've processed all elements!
[1, 2, 3, 4, 5]
```
...it gives you not exactly what you want.

The actual solution is trivial, though, and I think it's helpful enough to be considered for inclusion into the standard library.

```elixir
defmodule Stream do
  # ...
  @doc """
  Run a function `fun` only when the passed `enum` is completely exhausted.
  It's different from Stream.transform with `fun` evaluation on :halt,
  because in this case it would run on stopped usage of passed enum, not when enum is completely exhausted.
  I.e. it would run `fun` in `1..10 |> Stream.transform_on_exhaust(...) |> Enum.take(5)`,
  even though we haven't exhausted the stream (taking only 5 out of 10 items)
  """
  def on_exhaust(enum, fun) when is_function(fun, 0) do
    Stream.concat(
      enum,
      Stream.unfold(nil, fn _ ->
        fun.()
        nil
      end)
    )
  end
  # ...
end
```

(The naming part is up for discussion, though).
Reply all
Reply to author
Forward
0 new messages