Proposal: Add Stream.start_link/1,2 and Stream.start/1,2

117 views
Skip to first unread message

James Fish

unread,
Aug 4, 2015, 8:27:36 AM8/4/15
to elixir-l...@googlegroups.com
Hi all,

Currently there is not a built in method to run a Stream as a long running process. It is quite easy to do this with a Task:

Task.start_link(fn() -> Stream.run(stream) end)

However this approach does not handle OTP system messages and leaves them unreceived in the tasks' mailbox. This could lead to the message queue increasing and means any selective receives carried out by the stream will have to traverse these messages every time. It would be nice for a Stream process to handle system messages without requiring any extra code to gain the OTP debugging capabilities. For example `:sys.get_state/1` could fetch the continuation function of a suspended Stream for debugging in the shell, or `:sys.trace/2` could print the values returned by the stream to `:stdio`.

From Elixir 1.2 (using a new system message in OTP 18.0) there could also be a `Stream.stop` that could stop a Stream process by halting the Stream cleanly.

Also by using a Task there is not a initialisation phase before returning from `start_link`, which means that `{:ok, pid}` is always returned even if the stream does not start. For example a stream created by `File.stream!/3` could fail in to open a file. In this situation it would be preferable to return `{:error, {%File.Error{}, stacktrace}}`, like a `GenServer` or an `Agent`.

I propose that the functions `Stream.start_link/1,2` and `Stream.start/1,2` be added that start a Stream in a process with the desired features described above. A version is available at https://hex.pm/packages/stream_runner/, which can start a Stream as a long running process with:

StreamRunner.start_link(stream)

StreamRunner spawns a process and runs a loop that flushes system messages and then fetches one value from a Stream at a time.

The ability to change the continuation function as part of a code change has not been implemented. I am unsure of the best method, but a similar method to an `Agent` could be used where a module, function and arguments tuple is passed to alter the state. In this case the call could return a new stream, and the current stream halted.

Note that if `Stream.interval(a_long_time)` is used the StreamRunner process will block for a long time without receiving the system messages because control is with the Stream for each interval. Therefore a StreamRunner process might be unresponsive to system messages for the length of an interval, leading to timeouts.

If you have any thoughts, ideas for improvements or a different method of running Streams as long running processes please let me know.

Thank you to Ben Wilson for bringing the issue forward and feedback.

Kind Regards,

James

Peter Hamilton

unread,
Aug 4, 2015, 9:53:52 AM8/4/15
to elixir-l...@googlegroups.com

Stream.interval could be moved to using :timer.send_after

The process created by Stream.start_link would need to special case an interval Stream.

That's a specific case, but the more general case is "How do we avoid blocking in our Stream?"

I think a new Stream behavior might be interesting. It would facilitate code change as well as a more asynchronous pattern for receiving messages.

I'm not sure about the details, which is where it might get messy, but the idea is worth exploring in my opinion.


--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/CA%2BibZ99vokaNHcNxT4tJGapdKE7FW5sFXennVu9i3L0_Hcb3UQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Booker Bense

unread,
Aug 4, 2015, 10:11:45 AM8/4/15
to elixir-lang-core
It may just be a lack of coffee at the moment, but I'm having a hard time seeing how this would be used 
in anything other than the case where you only care about the Streams side effects. What if you want
the actual reduce function from the stream? 

As far as I know there aren't any other Stream functions that return something other than a Stream. 
You can't compose this with other Stream functions.  

I can see the usefulness of this functionality, but as always naming is a hard problem. Putting this in
Stream seems odd to me.

- Booker C. Bense 

José Valim

unread,
Aug 4, 2015, 10:14:57 AM8/4/15
to elixir-l...@googlegroups.com
It may just be a lack of coffee at the moment, but I'm having a hard time seeing how this would be used 
in anything other than the case where you only care about the Streams side effects.

Yes, that's exactly the use case for this. For now, those streams are not common in Elixir but it is one of the things we would like to improve in upcoming Elixir releases. I should be able to create Stream "stages" that receives messages from previous stages and forward them to next ones. That's one of the many different ways we could explore concurrency and it would be side-effect based, because sending and receiving messages are side-effects.
 

Ben Wilson

unread,
Aug 4, 2015, 10:18:00 AM8/4/15
to elixir-lang-core, jose....@plataformatec.com.br
I think the issue with picking Stream.interval as the source example is that it's so easily re-implemented with Process.send_after. My particular concern has to do with pull based Stream.resource type things, where items are picked up from say a paginated API and you only want to do further requests when you run out of items from the prior request.

Booker Bense

unread,
Aug 4, 2015, 10:49:43 AM8/4/15
to elixir-lang-core, jose....@plataformatec.com.br
From the Stream moduledoc

> Note the functions in this module are guaranteed to return enumerables.

This breaks that guarantee. Well, I guess it's already broken with Stream.run. 

To me it feels like you are conflating two concepts. Streams are currently 
mostly "dynamic composable Enumerables"; this is a step in a different direction to 
Streams as Dataflow computations.  

Maybe it might make sense to pull run, and resource out of Stream and into a new
Dataflow Library. Just to be absolutely clear, I really like the direction this is going, but
there are already a lot of methods in Stream. Hanging out in various elixir forums it 
seems like people have trouble grasping Streams as is[1]. Adding this new functionality
to an already tricky module seems like a barrier to use.

If we start thinking in Dataflow terms we have sources, sinks and tranforms. The current 
Stream module has a couple source functions, one sink and is mostly transforms. It might
make sense to pull the source/sink functions into their own module and make Streams
just a module of transforms.

The problem is that Streams is a great name for a Dataflow module, but it's already in
use for a module that is largely just Transforms. Renaming Streams to Tranforms would
be too confusing. 

- Booker C. Bense

[1]- People can use it just fine, they have misconceptions about what it really does. I know
I did. 

Paul Schoenfelder

unread,
Aug 4, 2015, 11:52:25 AM8/4/15
to elixir-l...@googlegroups.com, José Valim
If we start thinking in Dataflow terms we have sources, sinks and tranforms. The current 
Stream module has a couple source functions, one sink and is mostly transforms. It might
make sense to pull the source/sink functions into their own module and make Streams
just a module of transforms.

The problem is that Streams is a great name for a Dataflow module, but it's already in
use for a module that is largely just Transforms. Renaming Streams to Tranforms would
be too confusing.

I actually find the current organization pretty easy to reason about, since it matches quite well with other libraries I'm familiar with, in particular Rx.NET. We have .NET's Enumerable in Enum, Rx's Observable in Stream, and you choose one or the other based on whether you want eager/lazy or push/pull semantics in a given situation. That's just me though. I think I would find it more confusing to have to use more than one module to work with Streams (unless that module is an abstraction that happens to work with Streams of course). I don't think the fact that the current documentation states that all functions in Stream return an enumerable is a reason to break the module up. The docs for each function in Stream are pretty clear about what they do.

That's just my two cents though, I admit I don't have strong feelings one way or the other because I haven't dealt with any pain points with the current API myself, I'm mostly arguing from the position of keeping things simple unless there is a clear reason not to. You did mention that there have been many people confused about what Stream really does, but my question would be why are they confused. Is it because they are trying to intuit their way through the standard library and Stream breaks their expectations? Are they reading the Stream docs and walking away confused? I think those are some important things to know before using that as a reason to change Stream as it exists today (outside the context of James changes that is).

Paul

 

--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.

Stian Håklev

unread,
Aug 4, 2015, 12:05:31 PM8/4/15
to elixir-l...@googlegroups.com, José Valim
Interesting discussion, but some small code examples would be very useful. Personally I've only used Stream to avoid loading a large file into memory, or to make sure that multiple operations only require one pass over the data. I'm quite interested in Jose's ideas for parallelism etc but would love to see more examples of how people are using Streams.

Stian


For more options, visit https://groups.google.com/d/optout.



--
http://reganmian.net/blog -- Random Stuff that Matters

Peter Hamilton

unread,
Aug 4, 2015, 12:20:47 PM8/4/15
to elixir-l...@googlegroups.com, José Valim
Simple code example:

Stream.interval(10000) |> Stream.each(fn (_) ->
  score = fetch_current_score_from_espn
  ScoreKeeper.update_score(score)
end)

Where:

`fetch_current_score_from_espn` is a function that makes an HTTP call to retrieve the score.
`ScoreKeeper` is a GenServer that stores the score so it can be queried by other processes.

Effectively, we are polling ESPN for updates.

The problematic situation here is how to recover when it crashes. Supervision is required if this is a crucial part of the application.

The traditional OTP approach (and feel free to correct me here James) to this situation is to have a GenServer send itself messages using send_after. Then the request to ESPN should be async, resulting in a message being sent back to the GenServer when it completes. The effect is that the GenServer itself is supervised and responds to all OTP system messages.

I think what we want to accomplish here is a way to take the nice shorthand notation of streams and add in robust error handling and instrumentation.

Stian Håklev

unread,
Aug 4, 2015, 12:25:30 PM8/4/15
to elixir-l...@googlegroups.com, José Valim
Thanks, that clarifies a lot. Very different approach to using Streams than what I'm used to. For this example, it would seem clearer to have a function like every_n_mseconds(1000, fn -> ... end). Is there anything about Streams that makes this paradigm particularly useful?

Stian


For more options, visit https://groups.google.com/d/optout.

Peter Hamilton

unread,
Aug 4, 2015, 2:17:28 PM8/4/15
to elixir-l...@googlegroups.com, José Valim
It was a simple example. In practice, you can keep it as an Enumerable by using Stream.map instead of Stream.each as I wrote. You can use all the other Stream methods to process it via a pipeline of sorts. Of particular use is Stream.filter, for example.

Booker Bense

unread,
Aug 4, 2015, 3:20:31 PM8/4/15
to elixir-lang-core, jose....@plataformatec.com.br


On Tuesday, August 4, 2015 at 8:52:25 AM UTC-7, Paul Schoenfelder wrote:

That's just my two cents though, I admit I don't have strong feelings one way or the other because I haven't dealt with any pain points with the current API myself, I'm mostly arguing from the position of keeping things simple unless there is a clear reason not to. You did mention that there have been many people confused about what Stream really does, but my question would be why are they confused. Is it because they are trying to intuit their way through the standard library and Stream breaks their expectations? Are they reading the Stream docs and walking away confused? I think those are some important things to know before using that as a reason to change Stream as it exists today (outside the context of James changes that is).


I have seen a lot of comments suggesting that people think Streams have "state" or
that they act in some way as mini-processes. I think a lot of this comes from the initial 
introduction to Stream in the context of File.stream!

There is this sense that a Stream creates some kind of engine of state that you can iterate 
through, that somehow a stream "remembers" where it is.   

For example: 

iex(11)> foo = Stream.interval(10) |> Stream.map(fn(x) -> 2 * x end )
#Stream<[enum: #Function<47.113986093/2 in Stream.unfold/2>,
 funs: [#Function<45.113986093/1 in Stream.map/2>]]>
iex(12)> foo |> Enum.take(1)
[0]
iex(14)> foo |> Enum.take(2)
[0, 2]

If you think of Stream as a simple composition of functions, this result is not a surprise. 
However, I think many people coming to Elixir w/o prior exposure to lazy enumerables
would be surprised by that result. I know I didn't really "get" Stream until I played with
examples like that. 

Adding the Dataflow functions to Stream moves it from a set of functions that are largely 
"pure" to a whole bunch of functions that only exist to implement side effects. 

Now, I could be entirely wrong about the confusion this would cause, but I think it's
important to think at length about the naming for these tools. I think this is an
important step for Elixir; flow based computing across BEAM processes looks to be
a very powerful abstraction. Picking the right name is a big part of making that usable.

To me, this is just step one in a larger Dataflow way of thinking about Streams. I'd like
to see the full API, so to speak to get a better idea of how it all would work. 

If nobody else thinks this is an issue, I'm fine with dropping it. 

- Booker C. Bense



José Valim

unread,
Aug 4, 2015, 4:22:59 PM8/4/15
to Booker Bense, elixir-lang-core
Booker, those are good points. I don't necessarily agree with the solution, it is too early to tell if we should split into another module, but I definitely agree with the point being made.

The only stream that contains state so far in Elixir itself are "GenEvent.stream/1" and "IO.stream/2" and they are rarely used so the intuition right now is indeed streams are pure. However, once they become more prominent, we will need to tackle them directly and properly address the two kinds of streams. For example, Rx calls them hot and cold observables.

Finally, it is worth pointing out that the File stream you mentioned is based on side-effects but it is still "replayable" (as long as someone doesn't change the file). So Stream.resource/3 does not necessarily break the intuition you have developed nor the presence of sources and sinks. So the line is bit blurrier than one would expect.

At this point I am more interested in exploring the possibilities than in names anyway.



José Valim
Skype: jv.ptec
Founder and Director of R&D

Saša Jurić

unread,
Aug 5, 2015, 8:37:39 AM8/5/15
to elixir-lang-core
I think this is a nice addition that paves way for running some longer processing and still be responsive to system messages.

I do agree with Booker that this shouldn't be a part of the Stream module which is mostly data/transformation oriented. Introducing another module (Stream.Process, Stream.Runner, or something similar) looks like a better approach.

It's not clear to me how exactly would dataflow or pipelining work here. The functionality, as described, relies on the fact that messages are not received in the stream, which doesn't make it suitable to be data sink. I'm also not sure what are other "many different ways we could explore concurrency". I'd like to know more about end goals, maybe with some high-level pseudo-code snippets :-)

Peter Hamilton

unread,
Aug 5, 2015, 10:18:45 AM8/5/15
to elixir-lang-core

+1 for Stream.Process or Stream.Runner

Perhaps use Task.Supervisor as an example as well? Stream.Supervisor?


--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.

Brun Tavares

unread,
Aug 5, 2015, 1:25:35 PM8/5/15
to elixir-lang-core
+ 1 to Booker and Sasa. My feeling is that it should be in another module.

Bruno.

Ben Wilson

unread,
Aug 5, 2015, 4:02:20 PM8/5/15
to elixir-lang-core
Stream.Supervisor.start_link is an interesting idea. For those of you who want to know more about use cases, there are a lot of pull based data sources that require polling. Amazon's queue service for example. Supposing each SQS messages represents a task that's to be started on a cluster, I'd like to be able to have a stream like:

SQS.stream!("queue")
|> Stream.map(&my_business_logic/1)
|> Stream.into(ECS.cluster("my-task-cluster", &transform_fn/1)

There's a lot of important things about this approach. For one thing, the stream source and the Stream.into destination need not be hard coded. If I encapsulate the stream in a function with the input and output as arguments, I can pass in stubbed values for easy testing without having to involve or concern the transformation and business logic at all.

I get that all of this functionality can be obtained / performed with ordinary GenServers, message passing, and so on. But by describing the functionality in this way there's a particularly nice separation of concerns. Data source de-coupled from data destination, de-coupled from the logic in the middle, and with Stream.Supervisor.start_link it's also de-coupled from the means by which it's run.

From the discussion James and I had with Jose the other day, it's clear that a lot of the ambitions here parallel those of Connectable, which is not yet ready to be executed. I don't want to introduce something into the stdlib that comes to later bind or otherwise inhibit innovation surrounding that protocol. That said, streams can provide a mechanism to powerfully describe important chunks of application functionality, and it would be nice if they could be run in a way that granted them proper OTP citizenry.

Booker Bense

unread,
Aug 6, 2015, 10:55:34 AM8/6/15
to elixir-lang-core
I may be overthinking this or maybe it's already in the plans with Connectable ( is that documented anywhere?) 

But what if we copied the model of Enumerable and Enum? 

i.e. we come up with a basic protocol of what would be needed in terms of functions and then build a module 
around using those basic functions in various ways. 

As this thread shows, the Stream.run doesn't provide sufficient hooks to simply run it as a task. I'm not sure exactly what
the right hooks are. I think that perhaps this dovetails into the previous discussion we had about Stream.on. 

My initial thinking is something like this:

A Computable protocol that implemented a basic "run" and maybe some other methods. 

A Flow module that would use these basic methods to implement functions such as

Flow.run( Computable )

Flow.run_as_task ( Computable, Count \\ 1 ) 

Flow.run_as_agent

While Stream is the only Computable we have currently, it's essentially only a linear flow. 
I can imagine other kinds of graphs that might be useful. Ring ( a Stream that loops back 
on itself ) for example.  Map/Reduce seems another obvious example.

The general idea would be to capture some standard patterns in which you map bits of execution
to multiple processes and the way those processes communicate.


- Booker C. Bense

Booker Bense

unread,
Aug 6, 2015, 10:57:48 AM8/6/15
to elixir-lang-core
Sorry s/methods/functions/, too many years writing ruby code. 
Reply all
Reply to author
Forward
0 new messages