Hi,
Reading the documentation, the following quotes make me think that the usage I'm doing is wrong :
This flow combinator is guaranteed to work correctly on flows that have behavior of classic total functions, meaning that they should be a one-t-one functions that don't reorder, drop, inject etc new elements.
I'm using in the following way :
val innerFlow =
Flow
.map { msg =>
someIO(msg) that returns a list
}
.mapConcat(identity)
.via(someLogic1)
.via(someLogic2)
Consumer
.committableSource(settings, Subscriptions.topics("my-topic"))
.via(PassThroughFlow(innerFlow))
.map { case (committableMessage, processingResult) => ??? }
The innerFlow takes one message from Kafka and may returns more than 1 output so the function is not 1:1 but 1:N. So for me it's the wrong way to use this PassThroughFlow right or maybe there is something that I didn't understand ?
By the way, the general use case that I'm trying to solve :
- For each message from Kafka, one flow is doing some IO that returns a List of N elements and returns a [CommitableOffset, List[T]]
- I would like to use mapConcat but since I need to commit at the end of the graph I cannot do it because the CommitableOffset will the the same for elements of the list so I'm going to commit the same offsets multiple times.
This is why I tried to use PassThroughFlow, to wrap the logic into a innerFlow that is doing this job without taking care of commits.