Side effect stream combinator

93 views
Skip to first unread message

Eric Pederson

unread,
Jul 23, 2014, 9:10:24 PM7/23/14
to akka...@googlegroups.com
Hi guys:

Something that would be useful is a side effecting combinator.  For example, like doOnEach from RxJava.   This would be particularly useful in the actor world to insert an actor tell into the flow.

def doOnEach(f: => Unit): Flow[T]

For example:

flow.doOnEach { msg => if (msg.isSomething) actor ! msg }.map(....)...

This can be done with a Transformer but it's not really transforming.

Will there be a way to create Flow subtypes so that people can invent their own combinators?

Thanks,

Eric Pederson

unread,
Jul 23, 2014, 9:14:11 PM7/23/14
to akka...@googlegroups.com
Correction 

def doOnEach(f: T => Unit): Flow[T]


-- Eric


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/qWrA1sqBqiM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Akka Team

unread,
Jul 24, 2014, 4:35:59 AM7/24/14
to Akka User List
Hi Eric,

There is a foreach method that does exactly that. It gives you back a Flow[Unit] which will produce exactly one element at the end of the processing (so you can easily convert it to a Future[Unit] for example).

Is this covering what you need?

-Endre


You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Daniel Armak

unread,
Jul 24, 2014, 5:00:08 AM7/24/14
to akka-user
Alternatively, if what you want is really to produce the input elements (returning a Flow[T]) and only add a side-effecting operation, you can implement it using `map`.



Daniel Armak

√iktor Ҡlang

unread,
Jul 24, 2014, 5:07:06 AM7/24/14
to Akka User List
WARNING: Not compiled

implicit final class DoToEach[T](flow: Flow[T]) extends AnyVal {
def doOnEach(f: T => Unit): Flow[T] = flow.map(t => { f(t); t })
}

flow.map(...).doOnEach(println).filter(...)
Cheers,

Eric Pederson

unread,
Jul 24, 2014, 9:43:33 AM7/24/14
to akka...@googlegroups.com
Ah yes - pimp my flow!  And map works fine, definitely better than Transformer.

Thanks guys!


-- Eric

Akka Team

unread,
Jul 24, 2014, 9:52:22 AM7/24/14
to Akka User List
Hi Eric,



On Thu, Jul 24, 2014 at 3:42 PM, Eric Pederson <eri...@gmail.com> wrote:
Ah yes - pimp my flow!  And map works fine, definitely better than Transformer.

You might still want to express the internals of your "extend your flow" pattern, mostly because of the availability of the cleanup() method. That would allow you to have a

 def doOnEach(f: T => Unit, whenComplete: () => Unit): Flow[T]

if you want.

-Endre

√iktor Ҡlang

unread,
Jul 24, 2014, 10:09:25 AM7/24/14
to Akka User List


On Jul 24, 2014 3:52 PM, "Akka Team" <akka.o...@gmail.com> wrote:
>
> Hi Eric,
>
>
>

> On Thu, Jul 24, 2014 at 3:42 PM, Eric Pederson <eri...@gmail.com> wrote:
>>
>> Ah yes - pimp my flow!  And map works fine, definitely better than Transformer.
>
>
> You might still want to express the internals of your "extend your flow" pattern, mostly because of the availability of the cleanup() method. That would allow you to have a
>
>  def doOnEach(f: T => Unit, whenComplete: () => Unit): Flow[T]

Wwith the caveat that it wouldn't fire after the entire Flow completes, but when the upstream completes.

Reply all
Reply to author
Forward
0 new messages