groupBy limits

325 views
Skip to first unread message

Richard Rodseth

unread,
Feb 19, 2016, 5:12:03 PM2/19/16
to akka...@googlegroups.com
Thought I'd start a new thread for my latest stumbling block, while I explore some options that don't feel great.

Short version:
flatMapMerge has a "breadth" parameter which limits the number of substreams in flight. groupBy() does not. If maxSubstreams is exceeded the stream will fail. I am grouping stream elements and writing each group to a file. How can I limit the number of open files?

Background:
In my case the stream elements are channels, channel-months, channel-month-intervaldata.
I've coded up a possible solution in which I used grouped(n) to get a batch of channel-months, then mapAsync(1) to run a separate stream that gets the intervals for that batch of channel-months, groups them  and writes to files. 

def writeChannelMonths(channelMonthsList[ChannelAndInstantRange]): Future[Seq[FileWritten]]

So the size of n in the upfront grouped(n) will have the effect of limiting files open at a time. 

But I'm wondering if there's something more elegant. Not sure if this ticket discussed elsewhere will help:
I think not, unless a max open files was built into the sink stage, and caused backpressure if exceeded.
I sort of feel like a groupBy with two limits (max distinct groups and max active groups) is what I need.


Akka Team

unread,
Feb 22, 2016, 6:40:46 AM2/22/16
to Akka User List
Hi Richard,



On Fri, Feb 19, 2016 at 11:11 PM, Richard Rodseth <rrod...@gmail.com> wrote:
Thought I'd start a new thread for my latest stumbling block, while I explore some options that don't feel great.

Short version:
flatMapMerge has a "breadth" parameter which limits the number of substreams in flight. groupBy() does not.

Yes, it does: http://doc.akka.io/api/akka/2.4.2/index.html#akka.stream.scaladsl.Flow@groupBy[K](maxSubstreams:Int,f:Out=>K):akka.stream.scaladsl.SubFlow[Out,Mat,FlowOps.this.Repr,FlowOps.this.Closed]

Are you on 2.4.2?

 
If maxSubstreams is exceeded the stream will fail.

Ah, I see what you mean. There is no way to backpressure the upstream based on group count since how would you know whether the next element belongs to a new group or not? E.g. you limited your group count to 4, and then suddenly receive something that belongs to group 5. Your use case implicitly implies that you *already* have the groups available you don't need to dynamically compute them. I am not sure if groupBy is the best option in that case.

 
I am grouping stream elements and writing each group to a file. How can I limit the number of open files?

You are asking the impossible here :) Think a bit through and you will realize that it is mathematically impossible to
 - dynamically group to potentially M groups where every new upstream element can belong to an arbitrary group
 - at the same time limit the number of groups to N < M without failing the stream or dropping elements
 
So either you cannot limit the number of groups, or you need to make sure you feed your groups sequentially, in which case a simple splitWhen/splitAfter might be a better fit (with less parallelization opportunities)

-Endre


Background:
In my case the stream elements are channels, channel-months, channel-month-intervaldata.
I've coded up a possible solution in which I used grouped(n) to get a batch of channel-months, then mapAsync(1) to run a separate stream that gets the intervals for that batch of channel-months, groups them  and writes to files. 

def writeChannelMonths(channelMonthsList[ChannelAndInstantRange]): Future[Seq[FileWritten]]

So the size of n in the upfront grouped(n) will have the effect of limiting files open at a time. 

But I'm wondering if there's something more elegant. Not sure if this ticket discussed elsewhere will help:
I think not, unless a max open files was built into the sink stage, and caused backpressure if exceeded.
I sort of feel like a groupBy with two limits (max distinct groups and max active groups) is what I need.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Richard Rodseth

unread,
Feb 22, 2016, 10:35:21 AM2/22/16
to akka...@googlegroups.com
Thanks for the reply.I wasn't proposing that the number of groups be limited, just that the number being fed simultaneously be limited. I thought that each new element creates a group if it's a distinct key. That seems orthogonal to backpressure. But I suppose one wouldn't know when each substream completed, closing the file.  I'll take your word for it if groupBy can not be enhanced in this way. 

I wonder if the team would agree that maxOpenFiles could be built into the forthcoming file sink, with dynamic name selection.


In the meantime, the solution I came up with is working:  grouped(n).mapAsync(1) { run the rest of the stream which includes groupBy }
But I find myself wondering if running separate streams like this is an anti-pattern.
I don't see any way to avoid groupBy at some point, since each file contains the intervals for one channel-month.

Jakub Liska

unread,
Jun 30, 2017, 7:32:48 AM6/30/17
to Akka User List
Have you guys done some benchmark of the groupBy capacity, ie. what count of distinct elements and therefore substreams can be handled with what resources?

Or more general version of the benchmark, ie. benchmark showing performance of number of substreams running in parallel ?

If not, do you have a rough guess? Is 100 000 a reasonable number for m4.large instance? 

Jakub Liska

unread,
Jun 30, 2017, 8:42:19 AM6/30/17
to Akka User List
100 000 seems to be the maximum, beyond that, no matter how much memory and processor power it has, it blows up on  :

Substream Source has not been materialized in 5000 milliseconds.

Which is a consequence of GC choking I guess....
Reply all
Reply to author
Forward
0 new messages