Flow.chunk ?

148 views
Skip to first unread message

Peter C. Marks

unread,
Nov 9, 2016, 2:13:25 PM11/9/16
to elixir-lang-core
In an application of GenStage that I've been writing, I came across the need for something like Stream.chunk/2 or Stream.chunk/4 but for Flows. 

Is there a reason why Flow.chunk is not defined?

Thanks,

Peter

José Valim

unread,
Nov 9, 2016, 2:45:14 PM11/9/16
to elixir-l...@googlegroups.com
Hi Peter, can you talk a bit more about your use case?

chunk may be very sensitive to ordering so you will be chunking in no defined order. Maybe using a window that is based on event count may be a better fit semantically?



José Valim
Skype: jv.ptec
Founder and Director of R&D

--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-core+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/7c5e942f-1e55-4f90-bc79-79dc0622d292%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Peter C. Marks

unread,
Nov 9, 2016, 4:16:37 PM11/9/16
to elixir-l...@googlegroups.com
Hello José,

The use case is finding the frequency of occurrences of clumps of patterns in a DNA sequence.

For example, given the following sequence: CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA
and the the numbers l = 50, k = 5 and t = 4

a "clump-finding" algorithm will return all k length patterns that appear at least t times in all l length sub sequences of a sequence. 
The answers for the above sequence are: CGACA and GAAGA.

I have written a solution that begins with essentially two uses of chunk in a row:

sequence
|> String.to_charlist
|> Enum.chunk(l, 1)
|> Flow.from_enumerable
|> Flow.map(fn e -> Enum.chunk(e, k, 1) end)
...


and it works just fine. If the sequence is coming from a file, I take advantage of Stream.chunk. 

Note, that the sequence can easily be in the thousands, if not millions of nucleotides long and for that reason I thought that GenStage (Flow) would be a good tool. 

I don't think windows would help for this use case; one has to slide the window nucleotide by nucleotide and grab, as in the above example, 50 nucleotides at a time. 

Having looked at the implementation of chunk in Stream, I can see that it looks to be a bit complicated.

Thanks for considering this.

Peter





--
You received this message because you are subscribed to a topic in the Google Groups "elixir-lang-core" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/elixir-lang-core/Avea6YFZLRQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to elixir-lang-core+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/CAGnRm4%2BPR4X_oAq4M5o8mA0EZ9Ck32ZEgwhPhtjHCZOWENkC6g%40mail.gmail.com.

For more options, visit https://groups.google.com/d/optout.



--
Peter C. Marks
@PeterCMarks

José Valim

unread,
Nov 9, 2016, 4:44:19 PM11/9/16
to elixir-l...@googlegroups.com
 
sequence
|> String.to_charlist
|> Enum.chunk(l, 1)
|> Flow.from_enumerable
|> Flow.map(fn e -> Enum.chunk(e, k, 1) end)

Do you call partition at some point in your flow? Otherwise it won't exploit parallelism if you have only one source. Also, if you need to chunk before you partition, you can chunk before calling from_enumerable:

sequence
|> String.to_charlist
|> Stream.chunk(e, k, 1)
|> Flow.from_enumerable
|> ...

I think it will be easy to add chunking to Flow because we can delegate to Stream but I just want to make sure I fully understand your use case and where parallelism is being introduced.

Peter C. Marks

unread,
Nov 9, 2016, 5:19:01 PM11/9/16
to elixir-l...@googlegroups.com
Yes, I do use partition.  The full flow is:

  sequence
  |> String.to_charlist
  |> Enum.chunk(l, 1)
  |> Flow.from_enumerable
  |> Flow.partition
  |> Flow.map(fn e -> Enum.chunk(e, k, 1) end)
  |> Flow.map(
        fn e ->
          Enum.reduce(e, %{},
            fn w, acc ->
              Map.update(acc, w, 1, & &1 + 1)
            end)
        end)
  |> Flow.flat_map(
        fn e ->
          Enum.reject(e, fn({_, n}) -> n < t end)
        end)
  |> Flow.map(fn({seq, _}) -> seq end)
  |> Enum.to_list



--
You received this message because you are subscribed to a topic in the Google Groups "elixir-lang-core" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/elixir-lang-core/Avea6YFZLRQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to elixir-lang-core+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

José Valim

unread,
Nov 9, 2016, 6:58:20 PM11/9/16
to elixir-l...@googlegroups.com
Thanks Peter!

I believe you don't want to call Flow.chunk/2. Calling Enum.chunk(l, 1) before Flow.from_enumerable/2 is the way to go in your case as it guarantees *chunks*, and not letters, are spread around on Flow.map/2. If instead you called Flow.chunk/2 after Flow.from_enumerable/2, the DNA order would be lost by the time you get to Flow.chunk/2. You would effectively chunk items in random order. I would possibly only suggest to use Stream.chunk/2 instead of Enum.chunk/2 (so you don't need to build all chunks upfront).

On the other hand, if you are referring to the inner chunk in Flow.map/2, it also wouldn't yield the correct results, because you would be chunking groups of "e" and not a single "e" like now.

Finally, it doesn't seem you need partitioning at all as well, since you are not reducing over any state (I may have mislead you on a previous reply, sorry). My suggestion:

sequence
|> String.to_charlist
|> Stream.chunk(l, 1)
|> Flow.from_enumerable
|> Flow.flat_map(&find_sequences(&1, k))
|> Enum.to_list

def find_sequences(e, k) do
  e
  |> Enum.chunk(k, 1)
  |> Enum.reduce(%{}, fn w, acc ->
       Map.update(acc, w, 1, & &1 + 1)
     end)
  |> Enum.reject(fn({_, n}) -> n < t end)
  |> Enum.map(fn({seq, _}) -> seq end)
end

PS: I haven't tested it.




José Valim
Skype: jv.ptec
Founder and Director of R&D

--
You received this message because you are subscribed to the Google Groups "elixir-lang-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elixir-lang-core+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elixir-lang-core/CA%2BKdhmg2EZt6SLgE8g_oH%2B-Jjpx075BmPOvfePSvjQZsitXVVg%40mail.gmail.com.

Peter C. Marks

unread,
Nov 11, 2016, 10:16:15 AM11/11/16
to elixir-l...@googlegroups.com
Thank you José for your critique of my code and your suggested rewrite. Your code works great! (I just needed to add the parameter t to the call to find_sequences) I will need to spend a little more time understanding why you suggested those changes. I plan on blogging about this soon.

Thanks again,

Peter


For more options, visit https://groups.google.com/d/optout.

José Valim

unread,
Nov 11, 2016, 10:23:11 AM11/11/16
to elixir-l...@googlegroups.com
Glad to help! If you have any questions, feel free to ask.



José Valim
Skype: jv.ptec
Founder and Director of R&D

Peter C. Marks

unread,
Nov 14, 2016, 9:52:21 AM11/14/16
to elixir-l...@googlegroups.com
Hello José,

Before I post to my blog, I want to make sure that I understand what you meant when you said:

I believe you don't want to call Flow.chunk/2. Calling Enum.chunk(l, 1) before Flow.from_enumerable/2 is the way to go in your case as it guarantees *chunks*, and not letters, are spread around on Flow.map/2. If instead you called Flow.chunk/2 after Flow.from_enumerable/2, the DNA order would be lost by the time you get to Flow.chunk/2. You would effectively chunk items in random order. I would possibly only suggest to use Stream.chunk/2 instead of Enum.chunk/2 (so you don't need to build all chunks upfront).

My original code:
  sequence
  |> String.to_charlist
  |> Enum.chunk(l, 1)               #CHUNK A
  |> Flow.from_enumerable
  |> Flow.partition
  |> Flow.map(fn e -> Enum.chunk(e, k, 1) end)     #CHUNK B
  |> Flow.map(
        fn e ->
          Enum.reduce(e, %{},
            fn w, acc ->
              Map.update(acc, w, 1, & &1 + 1)
            end)
        end)
  |> Flow.flat_map(
        fn e ->
          Enum.reject(e, fn({_, n}) -> n < t end)
        end)
  |> Flow.map(fn({seq, _}) -> seq end)
  |> Enum.to_list

Do you mean that if Flow.chunk was implemented that it would behave differently from Enum or Stream.chunk? Or, are you only talking about where one places the call to chunk? The first chunk (CHUNK A) typically creates chunks that are 50 - 500 bases long. The second chunk (CHUNK B) creates chunks from these chunks that are usually 5 to 9 bases long. I'm not sure I see how it's possible for single base letters to arise unless the chunk size was set to a length of one.

Thanks very much,

Peter


For more options, visit https://groups.google.com/d/optout.

Tallak Tveide

unread,
Nov 14, 2016, 2:35:53 PM11/14/16
to elixir-lang-core

Hi!

I guess I might make a stab at analyzing this.

The code will create a list of the short sequences of length k that occur more than `t` times inside one of the longer sequences of length `l`. The order of these sequences is i undefined.

What I read from Jose's reply was that the use of Flow.chunk would shuffle the order of the computation of each chunk, which would be an issue if order was important (as I assumed first time I read your post).

So I must conclude perhaps that your code is working as designed?

I won't comment on multiprocessing here. But as you are using `Flow` here, I would assume some kind of optimization is going on, and perhaps the initial sequence is quite long.

I am thinking that the first `Enum.chunk(...)` might be very memory intensive, building lots and lots of lists, that must be copied as they are not the tail of the list. Perhaps using Stream.chunk would be a better choice here.

There is also some kind of duplication going on with regard to "chunk B" as most of the chunks computed in the second iteration of "chunk A" would be exactly the same as those for the previous iteration. And they would all be reallocated. I guess some kind of dynamic programming (the algorithm) could be useful in this scenario.



Peter C. Marks

unread,
Nov 14, 2016, 3:17:53 PM11/14/16
to elixir-l...@googlegroups.com
Hi Tallak,

Thanks for writing.

Maybe I wasn't clear enough in my original post, but order is *not* important; what is important is that all of the unique "Chunk B" s are counted. I wanted to make sure that, if Flow.chunk were ever implemented and used, it would not behave differently from Enum.chunk/4 or Stream.chunk/4. Yes, my original code does work - I've checked it against the course's test sets and the final test. BTW: I am not looking for anyone to do my homework for me!

You are right in suggesting that I use Stream.chunk instead of Enum.chunk. José suggested this as well. 

You are also right in noting that the same "chunk B" would appear in the next "chunk A" - it does look like duplication. And, perhaps that suggests a  way of improving the performance even more.  

Thanks again,

Peter


--
You received this message because you are subscribed to a topic in the Google Groups "elixir-lang-core" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/elixir-lang-core/Avea6YFZLRQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to elixir-lang-core+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

José Valim

unread,
Nov 14, 2016, 4:52:02 PM11/14/16
to elixir-l...@googlegroups.com
Do you mean that if Flow.chunk was implemented that it would behave differently from Enum or Stream.chunk?

It is not related to Flow.chunk per se. The issue is the following: as soon as you call Flow.from_enumerable, the letters will be shuffled around.

This example

[0, 1, 2, 3] |> Enum.to_list

is guaranteed to return:

[0, 1, 2, 3]

But this example:

[0, 1, 2, 3] |> Flow.from_enumerable |> Enum.to_list

is *not* guaranteed to return the same list. In fact, for a large list, each item will be processed in a separated processes that has seen only *part* of the items.

When you enter in Flow-land, you lose ordering. The fastest your code can is if it doesn't care about ordering at all. Partitioning can give you some sense of ordering. However, you still don't have total ordering, because if you want total ordering then it cannot be parallel.

So **assuming you care about the sequence when generating the initial chunks**, your best option is to generate the chunks before entering flow and then parallelize the computation on each chunk.

Reply all
Reply to author
Forward
0 new messages