[akka-streams] Wrapping one flow with another while keeping the inner's materialized value

579 views
Skip to first unread message

Itamar Ravid

unread,
Aug 19, 2016, 12:10:00 PM8/19/16
to Akka User List

Hi everyone,


I'm trying to write a function that wraps an arbitrary flow with another flow that performs a side-effect: measures the execution time of the inner flow. I'm also trying to avoid writing a custom GraphStage to do this.


So far, I've come up with this:


def measuredFlow[In, Out](timer: Timer)(flow: Flow[In, Out, NotUsed] = {
 
Flow[In].flatMapConcat { el =>
    val timerContext
= timer.timerContext()
   
Source.single(el).via(flow).map(e => timerContext.close(); e)
 
}


This is great and very composable, but has one restriction: it can only handle flows that I don't care about their materialized values. The reason is that there is no way to propagate the materialized value from within flatMapConcat, AFAICT.


I've been looking for different approaches to this, but couldn't find any that don't involve using flatMapConcat, as that is the only built-in stage that can wrap another stage. 


The other solution is to write a custom GraphStage and imitate FlattenMerge's usage of SubSinkInlet, but I'm wary of this as SubSinkInlet is marked as private API and there's no documentation for custom stages that wrap other stages.


Has anyone tried tackling this problem? Would love to hear any ideas.







Akka Team

unread,
Aug 19, 2016, 12:16:54 PM8/19/16
to Akka User List
Hi Itamar,

First of all, I think there is already a timing stage somewhere in akka-stream-contrib (https://github.com/akka/akka-stream-contrib), but I might be wrong.

As for using flatMapConcat, apart from the issue that it does not give you the materialized value, it is also slower than a normal inline stage.

The usual way to handle these is a custom BidiFlow, backed by a GraphStage. Then you can safely share state between the inputs and outputs. Basically, instead of "wrapping" a Flow, you add a BidiStage on top of it, resulting in another flow.

See the Flow.join(BidiFlow) method:http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl.Flow@join[I2,O2,Mat2](bidi:akka.stream.Graph[akka.stream.BidiShape[Out,O2,I2,In],Mat2]):akka.stream.scaladsl.Flow[I2,O2,Mat]

(click to expand to see the ASCII-art diagram of how it works).

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Lightbend - Reactive apps on the JVM
Twitter: @akkateam

Itamar Ravid

unread,
Aug 21, 2016, 3:30:23 AM8/21/16
to Akka User List
Very interesting - thanks for the quick reply! I'll look into that. 

And yes, there does exist a timing stage in akka-streams-contrib which solves this problem by measuring the time between elements, but this was more of an 'educational' question :-)

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages