Using mapConcat with PassThroughFlow can result to a dead lock ?

85 views
Skip to first unread message

Kilic Ali-Firat

unread,
Apr 24, 2021, 3:25:05 AM4/24/21
to Akka User List
Hi, 

I would like to have a confirmation about the usage of PassThroughFlow.

Reading the documentation, the following quotes make me think that the usage I'm doing is wrong : 
This flow combinator is guaranteed to work correctly on flows that have behavior of classic total functions, meaning that they should be a one-t-one functions that don't reorder, drop, inject etc new elements.

I'm using in the following way : 

val innerFlow =
    Flow
      .map { msg =>
        someIO(msg) that returns a list
      }
      .mapConcat(identity)
      .via(someLogic1)
      .via(someLogic2)

  Consumer
    .committableSource(settings, Subscriptions.topics("my-topic"))
    .via(PassThroughFlow(innerFlow))
    .map { case (committableMessage, processingResult) => ??? }

The innerFlow takes one message from Kafka and may returns more than 1 output so the function is not 1:1 but 1:N. So for me it's the wrong way to use this  PassThroughFlow right or maybe there is something that I didn't understand ? 

By the way, the general use case that I'm trying to solve : 
- For each message from Kafka, one flow is doing some IO that returns a List of N elements and returns a [CommitableOffset, List[T]] 
- I would like to use mapConcat but since I need to commit at the end of the graph I cannot do it because the CommitableOffset will the the same for elements of the list so I'm going to commit the same offsets multiple times. 

This is why I tried to use PassThroughFlow, to wrap the logic into a innerFlow that is doing this job without taking care of commits.  
Reply all
Reply to author
Forward
0 new messages