[Akka-Streams] Creating a custom async filter

71 views
Skip to first unread message

Simon Lam

unread,
Sep 16, 2016, 3:52:20 AM9/16/16
to Akka User List
Hello!

I would like to build a custom GraphStage that essentially filters out elements based on a comparison made against the result of a Future and would like to retain this value for future elements.  Additionally, I would also like to preserve the order in which elements pass through filter.  My current implementation uses mapAsync with parallelism factor set to 1 and an actor that executes the future and maintains state.  It pipes results back to self for processing and then sends it back to the temp sender created in the mapAsync call.  Is this approach appropriate?

Would you be able to provide any high level guidance on how to accomplish this in a custom GraphStage?  I think I grasp how AsyncCallbacks work but am not 100% sure about how to guarantee order.  If I execute separate Futures for each upstream push, there is the chance they may finish out of order? 

Any advice is greatly appreciated - thank you in advance!

Johannes Rudolph

unread,
Sep 20, 2016, 6:52:32 AM9/20/16
to Akka User List
Hi Simon,

you could also try to split up the asynchronous computation and the actual filtering like this:

def filterFunc(e: E): Future[Result]

xyz.mapAsync(n)(e => filterFunc(e).map(res => e -> res))
   .via(statefulFilterGraphStage)

And then implement the `statefulFilterGraphStage` with whatever stateful logic you need. The advantage would be that you don't need to manage asynchronous future completion in the single stage.

Would that work?

Johannes
Reply all
Reply to author
Forward
0 new messages