I am looking to batch elements in a flow in a non-standard way and sending them off.
A typical flow for this based on a static count would look like this:
Source[Element]
.grouped(batchSize)
.map(doRest(elements)
.Sink[LogResultsSomewhere]
Now, instead of a batchSize I want a different way to group the elements, e.g. by size in bytes or by a dynamic batch size values that adjust to an external condition.
My current solution for this is to use fold (scan) with filter. The fold function returns an option of the batched result, when the batch limit is reached it becomes Some(batch) and then the next element, a filter, only let's the batched elements pass through.
An example is at
https://gist.github.com/elm-/316236f63dce4feca34f (never mind the Await)
While this works, there are two issues:
- looks too complicated
- no final flush, i.e. the last batch lost since I do not know when to trigger my manual batch in the scan, i.e. check when it is the last element
My question is, is there a way to get this done with the standard methods and or what solution would be best here?
What I have in mind is to provide a custom grouped() implementation that takes my own condition.