Akka Stream: How to dynamically group elements?

2,340 views
Skip to first unread message

Elmar Weber

unread,
Feb 2, 2015, 12:40:39 PM2/2/15
to akka...@googlegroups.com
I am looking to batch elements in a flow in a non-standard way and sending them off.
A typical flow for this based on a static count would look like this:

Source[Element]
  .grouped(batchSize)
  .map(doRest(elements)
  .Sink[LogResultsSomewhere]

Now, instead of a batchSize I want a different way to group the elements, e.g. by size in bytes or by a dynamic batch size values that adjust to an external condition.

My current solution for this is to use fold (scan) with filter. The fold function returns an option of the batched result, when the batch limit is reached it becomes Some(batch) and then the next element, a filter, only let's the batched elements pass through.
An example is at https://gist.github.com/elm-/316236f63dce4feca34f (never mind the Await)

While this works, there are two issues:
- looks too complicated
- no final flush, i.e. the last batch lost since I do not know when to trigger my manual batch in the scan, i.e. check when it is the last element

My question is, is there a way to get this done with the standard methods and or what solution would be best here?

What I have in mind is to provide a custom grouped() implementation that takes my own condition.

Akka Team

unread,
Feb 2, 2015, 12:50:23 PM2/2/15
to Akka User List
Hi Elmar,



The solution is to use a custom stage, described here:
There are various examples in the cookbook how to use custom stages:
-Endre
 

What I have in mind is to provide a custom grouped() implementation that takes my own condition.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Endre Varga

unread,
Feb 2, 2015, 12:53:06 PM2/2/15
to akka...@googlegroups.com
On Mon, Feb 2, 2015 at 6:49 PM, Akka Team <akka.o...@gmail.com> wrote:
Hi Elmar,



On Mon, Feb 2, 2015 at 6:40 PM, Elmar Weber <goo...@elmarweber.org> wrote:
I am looking to batch elements in a flow in a non-standard way and sending them off.
A typical flow for this based on a static count would look like this:

Source[Element]
  .grouped(batchSize)
  .map(doRest(elements)
  .Sink[LogResultsSomewhere]

Now, instead of a batchSize I want a different way to group the elements, e.g. by size in bytes or by a dynamic batch size values that adjust to an external condition.

My current solution for this is to use fold (scan) with filter. The fold function returns an option of the batched result, when the batch limit is reached it becomes Some(batch) and then the next element, a filter, only let's the batched elements pass through.
An example is at https://gist.github.com/elm-/316236f63dce4feca34f (never mind the Await)

While this works, there are two issues:
- looks too complicated
- no final flush, i.e. the last batch lost since I do not know when to trigger my manual batch in the scan, i.e. check when it is the last element

My question is, is there a way to get this done with the standard methods and or what solution would be best here?

The solution is to use a custom stage, described here:
There are various examples in the cookbook how to use custom stages:

Elmar Weber

unread,
Feb 2, 2015, 1:21:08 PM2/2/15
to akka...@googlegroups.com
Hi.

thanks for the quick reply, that's exactly what I was looking for.

ciao,
elm
Reply all
Reply to author
Forward
0 new messages