Stream.merge and other goodies

511 views
Skip to first unread message

Peter Hamilton

unread,
Jul 19, 2014, 2:37:58 AM7/19/14
to elixir-l...@googlegroups.com
After spending quite a bit of time understanding Streams in Elixir, I started putting together a cool little library called Streamz.


It's largely proof of concept and I undoubtedly am doing things wrong (I'm sure I should be monitoring instead of linking somewhere). There's also a lot more to come, but I wanted to share the pieces I already have and get some feedback.

Take a look and let me know what you think.


José Valim

unread,
Jul 20, 2014, 10:03:44 AM7/20/14
to elixir-l...@googlegroups.com
Hello Peter,

This is really nice!

A couple other functions you could explore are Stream.group_by/2 and Stream.take_until/2. Stream.take_until/2 consumes data from a stream until a message is received in the other stream.

Regarding the code, here are a couple tips:


A "finished" message is usually treated as a failure exit signals, you may want to send something like :shutdown instead.

2. You should probably use references (make_ref), or at least match on the pid, when exchanging messages to guarantee you are receiving the messages from the process you are expecting. For example:


That would match any `:done` message. In theory nobody has access to that process but it would be best practice to tag it regardless. :)





José Valim
Skype: jv.ptec
Founder and Lead Developer


--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Peter Hamilton

unread,
Jul 20, 2014, 1:38:51 PM7/20/14
to elixir-l...@googlegroups.com
Thanks. Opened issues on github to discuss. I plan to take another stab at structuring some of the code (my focus has largely been functionality).



An interesting insight in exploring those two is that we largely have two categories of functions here. We merge streams from substreams and we break streams out into substreams. Between group_by and merge most of the other crazy pieces can be built fairly easily (as is the case of take_until).

merge seems to work fairly well. A few tweaks and I think it should be ready for general use. group_by has some design issues that need solving.

Thanks again for the feedback.

Jason Stiebs

unread,
Jul 21, 2014, 11:23:49 AM7/21/14
to elixir-l...@googlegroups.com
Very cool! This almost makes me want to rethink how Plug works! Maybe plugs consume a stream of http requests? 

Peter Hamilton

unread,
Jul 21, 2014, 11:32:05 AM7/21/14
to elixir-l...@googlegroups.com

Plug as a stream of connections could get very interesting if it were combined with group_by. Suddenly you've got a streaming router. Also, middleware becomes a Stream.map.

There's definitely a lot of cool things that can be done there.

Jason Stiebs

unread,
Jul 21, 2014, 11:50:50 AM7/21/14
to elixir-l...@googlegroups.com
Every plugs take items off the stream instead of being called when a request comes in. 

José Valim

unread,
Jul 21, 2014, 4:59:57 PM7/21/14
to elixir-l...@googlegroups.com
It is better to model each http request as a process. That's how we can explore the concurrency as exposed by the VM. It could be streams at the acceptor level, but you would like to spawn a new process as soon as possible, so it wouldn't really matter.

--

Dave Thomas

unread,
Jul 21, 2014, 7:47:33 PM7/21/14
to elixir-lang-core

It is better to model each http request as a process. That's how we can explore the concurrency as exposed by the VM. It could be streams at the acceptor level, but you would like to spawn a new process as soon as possible, so it wouldn't really matter.

I've been thinking about how you'd model that, and got to wondering—given that you have no control over what the request process does in terms of spawning other processes, would you be safer creating a supervisor and a process per request, limiting the damage if something does go wrong? 

Jason Stiebs

unread,
Jul 21, 2014, 8:22:41 PM7/21/14
to elixir-l...@googlegroups.com
Peter re-taskstream would it make sense to start a Task.Supervisor to watch the tasks? or are you leaving that up the caller? 


On Mon, Jul 21, 2014 at 6:47 PM, Dave Thomas <da...@pragprog.org> wrote:

It is better to model each http request as a process. That's how we can explore the concurrency as exposed by the VM. It could be streams at the acceptor level, but you would like to spawn a new process as soon as possible, so it wouldn't really matter.

I've been thinking about how you'd model that, and got to wondering—given that you have no control over what the request process does in terms of spawning other processes, would you be safer creating a supervisor and a process per request, limiting the damage if something does go wrong? 

--

Peter Hamilton

unread,
Jul 21, 2014, 8:25:42 PM7/21/14
to elixir-l...@googlegroups.com

Open this up in an issue in github. Right now they are all linked so if one dies the whole stream dies.

José Valim

unread,
Jul 22, 2014, 2:15:12 AM7/22/14
to elixir-l...@googlegroups.com

It is better to model each http request as a process. That's how we can explore the concurrency as exposed by the VM. It could be streams at the acceptor level, but you would like to spawn a new process as soon as possible, so it wouldn't really matter.

I've been thinking about how you'd model that, and got to wondering—given that you have no control over what the request process does in terms of spawning other processes, would you be safer creating a supervisor and a process per request, limiting the damage if something does go wrong? 

Yes, that's typically how it is done. We cover a similar setup in the Mix and OTP guides. There is an acceptor and a supervisor that supervises the requests (let's call it the request supervisor). The supervisor and acceptor are children of a parent supervisor (the app supervisor). Once there is a new connection, the acceptor spawns a new process as child of the request supervisor to handle the request.

In this case, the request supervisor doesn't really start anything, it is there just so you have an hierarchy and a nice way to shutdown your application (you can ask the request supervisor to terminate all children with a timeout of 5 seconds for example).


Martin Schurrer

unread,
Jul 24, 2014, 2:04:01 PM7/24/14
to elixir-l...@googlegroups.com
Also take a look at the Architecture section of ranch (on which cowboy
builds, on which Plug builds)
http://ninenines.eu/docs/en/ranch/HEAD/guide/internals/
> --
> You received this message because you are subscribed to the Google
> Groups "elixir-lang-core" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to elixir-lang-co...@googlegroups.com
> <mailto:elixir-lang-co...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout.

--

Kind regards,
Martin Schurrer

Peter Hamilton

unread,
Aug 8, 2014, 5:39:09 PM8/8/14
to elixir-l...@googlegroups.com
There's still a lot I'm working on (Stream.group_by is proving elusive...) but I was wonder what approach I should take. Right now this is all just a playground, but I think some of the pieces that are complete are very useful. I would like to polish them up and either release them as a package or add them to the stdlib. Specifically, I'd like to add the following to the Stream module:

Stream.merge/2
Stream.take_until/2
Stream.combine_latest/2
Stream.interval/1
Stream.timer/1

Which approach seems right here?


On Thu, Jul 24, 2014 at 11:03 AM, Martin Schurrer <martin.s...@gmail.com> wrote:
Also take a look at the Architecture section of ranch (on which cowboy builds, on which Plug builds) http://ninenines.eu/docs/en/ranch/HEAD/guide/internals/


On 21/07/14 18:47, Dave Thomas wrote:

    It is better to model each http request as a process. That's how we
    can explore the concurrency as exposed by the VM. It could be
    streams at the acceptor level, but you would like to spawn a new
    process as soon as possible, so it wouldn't really matter.


I've been thinking about how you'd model that, and got to
wondering—given that you have no control over what the request process
does in terms of spawning other processes, would you be safer creating a
supervisor and a process per request, limiting the damage if something
does go wrong?

--
You received this message because you are subscribed to the Google
Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send

For more options, visit https://groups.google.com/d/optout.

--

Kind regards,
Martin Schurrer
--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-core+unsubscribe@googlegroups.com.

José Valim

unread,
Aug 9, 2014, 6:04:33 AM8/9/14
to elixir-l...@googlegroups.com
Hey Peter,

Awesome! I think the best way to proceed is to start two discussions, one for the timer ones and the other ones that depend on merge/2, since those can get quite complex.

To me, the biggest concerns regarding merge/2, are:

* lack of use cases
* lack of async/hot streams today (we only have GenEvent streams?)

The reason it worries me is because it is hard to evaluate the performance and characteristics of those without actual use cases. For example, one of the things I was considering is to avoiding creating intermediate processes for Stream.merge/2 for async/hot streams because they are already backed by a process. Doing so would require a new, Stream.Mergeable protocol or something, but it is hard to know if it is relevant right now.

One of my thoughts is that parallel pipelines would provide a good use case for those. I had planned to cover this on my ElixirConf keynote but unfortunately I did not have time. I will try to explore those concepts quickly in the example below:

Imagine you have this code:

users
|> fetch_profiles
|> fetch_investments

def fetch_profiles(users) do
  for user <- users do
    fetch_profile(user) # external request
  end
end

def fetch_investments(profiles) do
  for profile <- profiles do
    fetch_investment(profile) # external request
  end
end

According to the part I did cover at ElixirConf, we should be able to easily parallelize it with:

users
|> fetch_profiles
|> fetch_investments

def fetch_profiles(users) do
  parallel for user <- users do
    fetch_profile(user) # external request
  end
end

def fetch_investments(profiles) do
  parallel for profile <- profiles do
    fetch_investment(profile) # external request
  end
end

This will ensure we fetch profiles and investments in parallel but we still need to fetch *all* profiles before we fetch investments. Assuming we can fetch everything in parallel (there is no pooling or contention on parallel for), the worst time of this will be worst_time(profile) + worst_time(investment).

What I want to be able to do is to connect the "parallel for"s. We can do this by moving to this code:

Parallel.flow users,
  [&fetch_profiles/2, &fetch_investments/2]

def fetch_profiles(input, output) do
  parallel for user <- input, into: output do
    fetch_profile(user) # external request
  end
end

def fetch_investments(input, output) do
  parallel for profile <- input, into: output do
    fetch_investment(profile) # external request
  end
end

The changes were very few:

* The pipeline became a Parallel.flow/2
* Each function now receives the input (enumerable) and output (collectable)

Now, as data leaves fetch_profiles, it reaches fetch_investments. Our worst time now becomes worst_time(profile + investment). In the step above, we are doing 1 to 1 forwarding, and we could have arbitrary operations inside each step. For example, we can use any stream function as part of the pipeline too, like:

Stream.scan(input, fn ... end) |> Enum.into(output)

But there are functions that need to work at the flow level (instead of inside each step like parallel for or Stream.scan) like:

* n to 1 (Stream.merge)
* 1 to n (Stream.split) (different strategies should be supported? like round-robin, pull, etc?)
* Feedback (be able to arbitrary inject data into previous steps)
* Aggregators

I believe once we start moving into this direction, we will have a much better idea on the best trade-offs to assume for Stream.merge/2 and friends, if it should be pull or push, how to avoid unnecessary communication, and so on.

Sorry for the wall of text and blurry thoughts but that is all I have right now. Your work already provides great insight on how functions like Stream.merge/2 should work and it will definitely help going forward when exploring those use cases.




José Valim
Skype: jv.ptec
Founder and Lead Developer


To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-co...@googlegroups.com.

Peter Hamilton

unread,
Aug 9, 2014, 11:27:06 AM8/9/14
to elixir-l...@googlegroups.com
So Parallel.flow/2 makes me cringe at first sight. It brings back bad memories of using async.js and spray . Having input and output args is similarly reminiscent of using Out args in Java. All of those bad experience I attribute to the shortcomings of the language/platform.

One of the greatest strengths of Elixir is the fact that we have a powerful scheduler in BEAM and therefore can avoid callback based async code. It makes a huge difference in the user experience. I would much rather simply do:

investments = users
  |> Stream.parallel_map(&fetch_profile/1)
  |> Enum.parallel_map(&fetch_investments/1)

Streamz.Task.stream actually supports this right now in a roundabout way:

investments = users
  |> Stream.map(&(fn -> fetch_profile(&1) end))
  |> Streamz.Task.stream
  |> Stream.map(&(fn -> fetch_investments(&1) end))
  |> Streamz.Task.stream
  |> Enum.to_list

It's a little clunky because you have to map outputs into closed over funs, but in the above example the worst time is worst_time(profile + investment).

Stream.merge takes care of mid stream injection:

old_profilers = get_profiles_from_cache

new_profiles = new_users
  |> Stream.map(&(fn -> fetch_profile(&1) end))
  |> Streamz.Task.stream

investments = Stream.merge(old_profiles, new_profiles)
  |> Stream.map(&(fn -> fetch_investments(&1) end))
  |> Streamz.Task.stream
  |> Enum.to_list

I guess I'm saying 2 things here:
  1. Favor Enumerable over Collectable when possible. This could be the same argument as pull vs push or lazy vs eager. My preference is Enumerable/pull/lazy. It's what has made Elixir so much more enjoyable than Javascript or Scala. I don't need to jump through any weird hoops.
  2. Let's avoid callback / pipeline style programming. We have the pipe operator which is fantastic. It doesn't hide execution details. Parallel.flow doesn't seem to offer much over the pipe operator other than supporting output params. This is dependent on favoring Enumerable.
Just my two cents on usage. I think you are right about needing more hot streams. We have GenEvent and IO.stream. I think Task.stream and Stream.parallel_map will be useful. I think to answer my original question, for now I should try to get a package out and invite people to start using it.

Thanks for the feedback!

- Peter

José Valim

unread,
Aug 9, 2014, 11:29:51 AM8/9/14
to elixir-l...@googlegroups.com
Btw, I just realized that it could be called Stream.flow instead of Parallel.flow. Basically, the idea is that each function given to flow/2 is executed in its own process and it receives two streams, the input one and the output one. The flow/2 function also wires those processes together.




José Valim
Skype: jv.ptec
Founder and Lead Developer


José Valim

unread,
Aug 9, 2014, 12:11:56 PM8/9/14
to elixir-l...@googlegroups.com
 One of the greatest strengths of Elixir is the fact that we have a powerful scheduler in BEAM and therefore can avoid callback based async code. It makes a huge difference in the user experience. I would much rather simply do:

investments = users
  |> Stream.parallel_map(&fetch_profile/1)
  |> Enum.parallel_map(&fetch_investments/1) 

That's a good point. I would really like to leverage for comprehensions though just because they are extremely convenient to write and use. I guess though we can use it if we make "parallel for" return streams by default? So we would have something:

investments = users
  |> fetch_profiles()
  |> fetch_investments()
  |> Enum.into([])

def fetch_profiles(users) do
  parallel for ...
end

def fetch_investments(profiles) do
  parallel for ...
end

I agree with you though, this is better. One issue though is that without explicitly identifying the intermediate steps, it gets harder to do feedback. One option is to create a GenEvent, merge it with users at the beginning of the pipeline and new users could be sent to the GenEvent. But that may be overkill if you consider each parallel coordinator already has its own state.
 
I guess I'm saying 2 things here:
  1. Favor Enumerable over Collectable when possible. This could be the same argument as pull vs push or lazy vs eager. My preference is Enumerable/pull/lazy. It's what has made Elixir so much more enjoyable than Javascript or Scala. I don't need to jump through any weird hoops.
  2. Let's avoid callback / pipeline style programming. We have the pipe operator which is fantastic. It doesn't hide execution details. Parallel.flow doesn't seem to offer much over the pipe operator other than supporting output params. This is dependent on favoring Enumerable.
Good points! The pull/push strategy does not relate to collectable/enumerable because once we have processes (state) and we can convert one into the other. GenEvent streams are push even though it is an Enumerable.

I believe the decision to choose pull or push will depend on our failure strategy and which one we could optimize the most. The example above:

investments = users
  |> Stream.parallel_map(&fetch_profile/1)
  |> Enum.parallel_map(&fetch_investments/1) 

Will have 3 processes (the current one plus one for each stage) + 2 pools of processes for parallelism. For each user, we would have 14 messages:

1. Send the user to the parallel coordinator (and ACK)
2. The parallel coordinator sends the user to the parallel worker (and ACK)
3. The parallel worker sends the calculated profile to the parallel coordinator (and ACK)
4. The parallel coordinator sends the profile to the next parallel coordinator (and ACK)
5. The parallel coordinator sends the profile to the parallel worker (and ACK)
6. The parallel worker sends the calculated investment to the parallel coordinator (and ACK)
7. The parallel coordinator sends the calculated investment to the current/starting process (and ACK)

To summarize, some extra concerns are:

1. How to incorporate data feedback

2. How to handle failures. For one-off operations, starting linked processes is fine but we may also want to have a parallel pipeline, where different processes are giving some users as argument and they want the profile only for those users back. In those cases, it doesn't make sense to create the whole pipeline for every batch of users.

3. How to reduce message noise and data copy? 

I have some other ideas but this is getting way too long. Would you be up for some hangout/chat later this week? :D

José Valim

unread,
Aug 10, 2014, 4:19:59 PM8/10/14
to elixir-l...@googlegroups.com
Hello everyone,

Peter and I had a very productive chat yesterday. I will try to describe the main points we discussed. Peter, feel free to correct me or add important information I missed.

Peter started by raising the point that he is mostly concerned about simplicity than performance. We have discussed and agreed to find a good balance. For example, Peter will try to see different ways to reduce the amount of message and copying in Streamz.merge/2 as an experiment that we could possibly replicate on reducing the amount of messages in other scenarios (like the one I raised in the previous e-mail).

At some point, we moved the conversation to parallel operations and I have stated a preference for a pool of works when doing something like pmap while Peter stated that we could initially just spawn N process when mapping N items. The issue is that, for some value of N, it will be too much. So we need to benchmark and figure out strategies like chunking, pooling and so on.

Then we moved on to compare both parallel styles that we discussed in this thread: one based on something like flow, with explicit steps, and another one based on pipe and function composition.

At some point, we started discussing having a Parallel module with a bunch of parallel operations, like map, filter, flat_map, etc. We realized though that Parallel operations can be eager (like Enum) or lazy (like Stream). Furthermore, the Parallel module wouldn't necessary compose with Stream functions. For example, if you have this code:

users
|> Stream.Parallel.map(...)
|> Stream.Parallel.map(...)
|> Enum.into([])

it can nest both map operations, reducing the amount of processes created (the same way Stream.map/2 "nests"). However, if you add a Stream.take(n) in the middle, this property is lost:

users
|> Stream.Parallel.map(...)
|> Stream.take(10)
|> Stream.Parallel.map(...)
|> Enum.into([])

Note we can't have Stream.Parallel.take/2 because it relies on ordering and halting (which you lose once you go parallel). We have agreed that all possible combinations (Enum.Parallel and Stream.Parallel) plus how it interacts with other modules can be very confusing.

I noted that the advantage of something like Parallel.flow/2 is that we could create a explicit process for everything in the parallel pipeline and have tight control of such processes. For example, we could even have a Parallel.graph(...) that would generate a graph on the parallel pipeline structure, or rearrange topologies as needed in production. We haven't figured out a way to remove the need for input/output and remove the dependency on collectable though.

Finally, we have discussed possible syntaxes for parallel comprehensions. Since we have raised the possibility of "parallel for" returning a stream, I have commented that this code will be common:

parallel for x <- users, do: do_x(x)

The issue is that, since it returns a stream, users would like to pipe it into Enum.into/2:

parallel for x <- users, do: do_x(x) |> Enum.into([])

But that has the wrong precedence. One option would be to make "parallel for" be eager by default but support a lazy: true (or stream: true) option on both "parallel for" and "for":

for x <- users, stream: true, do: do_x(x)
parallel for x <- users, stream: true, do: do_x(x)

Another alternative raised was to just ask people to properly wrap "parallel for" with parentheses:

parallel(for x <- users, do: do_x(x))

Finally, we haven't discussed failures and how to incorporate feedback on any of the approaches. We basically explored different ideas and as such it definitely requires more tinkering. :)




José Valim
Skype: jv.ptec
Founder and Lead Developer


Peter Hamilton

unread,
Aug 10, 2014, 4:34:18 PM8/10/14
to elixir-l...@googlegroups.com

Good summary José. I'd say one other piece that was left out was ordering on parallel operations.

If we want to preserve order while maintaining independent function composition (ie an element can move to the next stage even if other elements have not completed the current stage) the normal approach is to tag elements with an index and then sort at the end of the pipeline. This introduces the idea of "around" functions on streams, which opens up all sorts of interesting possibilities as well as complications.

As said, there's a lot of speculation still and so the goal is to focus on parallel map/for because that is where users will immediately see benefit.

Fun stuff!

- Peter

Robert Brown

unread,
Aug 12, 2014, 11:28:40 AM8/12/14
to elixir-l...@googlegroups.com, jose....@plataformatec.com.br
I have been doing some work on this recently with my parallel collections library.  I wasn't planning on publishing it quite yet, but since the topic has come up, I decided to push my code to GitHub. 


I just have a parallel list at this point. It's very raw and littered with comments as I have learned things. However, it should be good enough to get the ideas across. I based everything off of Clojure and Scala's parallel collections. I took a reduce-combine approach. This is a variant of map reduce. This means that the parallel operations must be associative and have an identity value. Since not all the Enum functions are associative, I think these functions will need to be moved into a Enum.Sequential module. The common code will be in the Enum module, and any parallel-specific code will go into an Enum.Parallel module. Though, my code currently just calls it ParallelEnum. 

I'm not sure what the best approach is for concurrency. Currently my parallel list spawns a process for each scheduler. It's not great, but it shows that it works. Clojure appears to partition collections into 512 pieces then divvies out the work to a pool. On top of that it uses work stealing. From my research of parallel collections, this appears to be a very efficient approach for very large collections. I thought of exposing an options list to the Enum module so that a different technique or different number of partitions could be specified if the caller knows that a different approach may be better for a particular scenario. Clojure does something similar by allowing the caller to specify the number of partitions. 

I would love to hear everyone's thoughts on my approach so far. 

José Valim

unread,
Aug 12, 2014, 11:43:12 AM8/12/14
to elixir-l...@googlegroups.com
Very nice Rob! Some comments inline.

I based everything off of Clojure and Scala's parallel collections. I took a reduce-combine approach. This is a variant of map reduce.

Elixir's Enumerable are based on Clojure's reducers (except they can be suspended for zip). So if you are working with lists so far, it should not be that hard to work with any collection. The Stream module in Elixir provides composable computations (which are also Enumerables), as Clojure's reducers.
 
I'm not sure what the best approach is for concurrency. Currently my parallel list spawns a process for each scheduler. It's not great, but it shows that it works.

Yup, that's a point I am not sure too. In my experiments with Elixir test cases and the parallel compiler, I usually hit the sweet pot with 2x the numbers of schedulers. The erlang skel project has map (which is unbound) and farm (which is a pool). The pool works as a master/slave (each process ask for the next item when it is done) and one could support chunking to reduce communication.

Work stealing is harder to do in Erlang as it requires explicit synchronization points between processes but we will only figure out the best strategies after we implement and benchmark them. :) If you start benchmarking and have news, please let us know!

Peter Hamilton

unread,
Aug 12, 2014, 11:59:18 AM8/12/14
to elixir-l...@googlegroups.com
Agree. Great work!

I have always been impressed with Scala parallel collections (or at least the concept). Just add a `.par` to you collection and off you go. In practice I've had bad experiences with them (too much tuning of the JVM and thread settings required). My hope is that BEAM will prove a better environment than the JVM.

One thing I pointed out to José that I like about Enum/Stream is that the execution (lazy/eager) is decoupled from the data structure. I can Enum.map a stream and I can Stream.map a list. Where parallel execution fits in here I'm not sure. I think a good goal would be for a data structure to support lazy/eager and sequential/parallel operations and the calling function would determine execution. It seems your approach could accomplish that via the ParallelEnumerable protocol.

- Peter


Robert Brown

unread,
Aug 12, 2014, 12:13:16 PM8/12/14
to elixir-l...@googlegroups.com
Yes, I like the idea of keeping parallel/sequential separate from the data structure. Scala’s philosophy is to use the same data structures, then use splitters to partition a data structure for various tasks to process. They even wrote a paper about it: http://infoscience.epfl.ch/record/150220/files/pc.pdf. This is why I’ve kept ParallelList a simple wrapper around List. Though, from what I’ve been seeing lately, I may be able to do away with the wrapper I’ve also tried to keep my Parallel Enumerable as close to Elixir’s existing Enumerable module. I tried not making any modifications, but the reduce function needed a combining function added as a parameter. 


..............................................
ROBERT BROWN
..............................................
iOS Developer / Software architect
..............................................
..............................................
..............................................

You received this message because you are subscribed to a topic in the Google Groups "elixir-lang-core" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/elixir-lang-core/l1RFH4vIhfg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to elixir-lang-co...@googlegroups.com.

José Valim

unread,
Aug 12, 2014, 1:29:23 PM8/12/14
to elixir-l...@googlegroups.com

Yes, I like the idea of keeping parallel/sequential separate from the data structure. Scala’s philosophy is to use the same data structures, then use splitters to partition a data structure for various tasks to process. They even wrote a paper about it: http://infoscience.epfl.ch/record/150220/files/pc.pdf.

Since Combiner and Splitter are traits in Scala, maybe one idea is to define Combiner and Splitter protocols in Elixir? This way you don't need a ParallelList. :)

The thing though is that combiner/splitter approach works nicely on trees and they depend on the parallelism strategy. For example, since work stealing is harder here, the approach of recursively splitting and spawning may not be that performant, even more if we consider the tree may not be well balanced. HashDict is definitely not well-balanced and I am not sure how Maps on Erlang 18.0 will look like. That doesn't stop us from creating our own data structures (conc-lists in Elixir would be fun).

Peter Hamilton

unread,
Aug 13, 2014, 10:21:01 AM8/13/14
to elixir-l...@googlegroups.com

Experimenting with performance and Streamz.merge:

After a discussion with José on Saturday I have tried a few new approaches. The first was to avoid routing data through the collector but to have the collector merely provide synchronization. It led to a decent speed up when there were just a few streams being merged but slowed down with a lot of streams being merged. The second approach was to eliminate the collector. The speedup here was huge. There was little difference in execution time between 2 streams and 10k streams. It is much more efficient. My machine started to bottleneck at around 10k cores, but it scaled very well.

I don't think we can do much better than that.

Here's more information, with pretty graphs. https://github.com/hamiltop/streamz/issues/10

Reply all
Reply to author
Forward
0 new messages