[akka-user] akka.stream.scaladsl.Flow and flatMap

1,021 views
Skip to first unread message

Rüdiger Klaehn

unread,
Apr 24, 2014, 6:01:56 AM4/24/14
to akka...@googlegroups.com
Hi All,

I just took a quick look at the new reactive streams API and in particular the scala DSL at http://doc.akka.io/api/akka-stream-experimental/0.2/index.html#akka.stream.scaladsl.Flow .

Looks pretty good so far. I especially like that this is supposed to evolve into a standard for the entire JVM and not just Scala.

One question:

Is there a reason that there is no flatMap defined in the DSL? Something like

def flatMap[U](f: T => Flow[U]) : Flow[U]

I think this would be quite useful for dynamically generating flows based on the values produced by another flow.

Example: you have a Flow[URL], and want to dynamically construct a flow to handle reading from the URL based on the content (protocol, etc.) of the URL. Then you want to flatten the whole thing so that you just get a Flow[ByteString].

This might not be the best example, but I hope you get the idea.

I don't see any reason why it should be impossible to construct a flatMap with the underlying primitives (Producers).

cheers,

Rüdiger

Patrik Nordwall

unread,
Apr 24, 2014, 10:24:01 AM4/24/14
to akka...@googlegroups.com
Hi Rüdiger,

Yes, several flattening operators are missing.

/Patrik


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

√iktor Ҡlang

unread,
Apr 24, 2014, 11:13:21 AM4/24/14
to Akka User List

Because there are many potential implementations, we explicitly opted out of flatMap. For now.

A minor correction: Akka Streams (Flow etc)  will never become a standard for the JVM.
The standardization effort is Reactive Streams and that is for interop between implementations. As such it does not prescribe any DSL as that will vary between JVM langs and libraries.

Cheers,
V

Rüdiger Klaehn

unread,
Apr 24, 2014, 11:52:16 AM4/24/14
to akka...@googlegroups.com
On Thu, Apr 24, 2014 at 5:13 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:

Because there are many potential implementations, we explicitly opted out of flatMap. For now.

Probably a good decision. Otherwise people will complain when you remove it again. You can always implicit your own. Maybe have a canonical flatMap available that has to be implicitly imported.

I just wanted to confirm that there is no fundamental limitation.

A minor correction: Akka Streams (Flow etc)  will never become a standard for the JVM.
The standardization effort is Reactive Streams and that is for interop between implementations. As such it does not prescribe any DSL as that will vary between JVM langs and libraries.

Of course. I think the underlying interfaces are nice and simple and should work well from other JVM languages.

What does the spec say about when you throw an (unchecked) exception in e.g. onNext? I did not see anything in https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/main/resources/spec.md . You are allowed to call onNext synchronously from requestMore. Will the caller of requestMore get the exception, or will onError be called?

cheers,

Rüdiger

Patrik Nordwall

unread,
Apr 24, 2014, 1:18:45 PM4/24/14
to akka...@googlegroups.com
onNext, onError, onComplete must not throw non-fatal exceptions. That should be clarified.

If you throw an exception in one of the combinators in Akka streams it will be reported downstream with onError, shutdown the processor and cancel upstream subscription.

/Patrik
 

cheers,

Rüdiger

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

√iktor Ҡlang

unread,
Apr 24, 2014, 4:24:09 PM4/24/14
to Akka User List
On Thu, Apr 24, 2014 at 5:52 PM, Rüdiger Klaehn <rkl...@gmail.com> wrote:
On Thu, Apr 24, 2014 at 5:13 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:

Because there are many potential implementations, we explicitly opted out of flatMap. For now.

Probably a good decision. Otherwise people will complain when you remove it again. You can always implicit your own. Maybe have a canonical flatMap available that has to be implicitly imported.


So the "fundamental" ones are "concat" (all from the first, all from the second, …), "merge" (any available) and "join" (one from the first, then one from the second, …)—so a flatMap definition could in theory require an implicit flatMap strategy (concat | merge | join).
 
I just wanted to confirm that there is no fundamental limitation.

Well, there are practical limitations. For instance, an infinite stream flatMapped with infinite streams.

A minor correction: Akka Streams (Flow etc)  will never become a standard for the JVM.
The standardization effort is Reactive Streams and that is for interop between implementations. As such it does not prescribe any DSL as that will vary between JVM langs and libraries.

Of course. I think the underlying interfaces are nice and simple and should work well from other JVM languages.

What does the spec say about when you throw an (unchecked) exception in e.g. onNext? I did not see anything in https://github.com/reactive-streams/reactive-streams/blob/master/tck/src/main/resources/spec.md . You are allowed to call onNext synchronously from requestMore. Will the caller of requestMore get the exception, or will onError be called?

onNext is executed asynchronously in the current spec, it is just a signal to the Subscriber that there is one new element available.
 

cheers,

Rüdiger

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Rüdiger Klaehn

unread,
Apr 25, 2014, 4:02:14 AM4/25/14
to akka...@googlegroups.com
On Thu, Apr 24, 2014 at 10:24 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:



On Thu, Apr 24, 2014 at 5:52 PM, Rüdiger Klaehn <rkl...@gmail.com> wrote:
On Thu, Apr 24, 2014 at 5:13 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:

Because there are many potential implementations, we explicitly opted out of flatMap. For now.

Probably a good decision. Otherwise people will complain when you remove it again. You can always implicit your own. Maybe have a canonical flatMap available that has to be implicitly imported.


So the "fundamental" ones are "concat" (all from the first, all from the second, …), "merge" (any available) and "join" (one from the first, then one from the second, …)—so a flatMap definition could in theory require an implicit flatMap strategy (concat | merge | join).
 
I would say that concat is the most fundamental. But I think there is nothing wrong with having the methods available under the above names. Then people can write their own implicit classes to get just the flatMap behavior they want if they want to use for comprehensions.

Something like this

object FlowForComprehension {
  implicit class FlatMapIsConcat[T](private val flow:Flow[T]) extends AnyVal {
    def flatMap[U](f: T => Flow[U]) : Flow[U] = flow.concat(f)
  }

  implicit class FlatMapIsMerge[T](private val flow:Flow[T]) extends AnyVal {
    def flatMap[U](f: T => Flow[U]) : Flow[U] = flow.merge(f, mergeSettings)
  }
  ...
}

import FlowForComprehension.FlatMapIsConcat

for(...

If you want to use different meanings for flatMap in one big for comprehension, then I guess you're screwed. But that is probably not such a good idea anyway for code readability.
 
I just wanted to confirm that there is no fundamental limitation.

Well, there are practical limitations. For instance, an infinite stream flatMapped with infinite streams.

With concat, you will never see anything of the second stream, but you don't need to take any special precautions in the implementation.

With join you will only ever see the first argument of each stream.  And a naive implementation would subscribe to every flow and run out of memory eventually. So there should probably be a size limit.

With merge, I guess it depends on the settings. You want to merge from only a finite number of flows at the same time, otherwise the call will try to subscribe to an infinite number of flows and never return...

All in all, nothing really surprising for people familiar with the Streams from scala.collection. Except maybe the nondeterminism of operations like merge. But this is to be expected since "Streams are not Collections".

Cheers,

Rüdiger

liska...@gmail.com

unread,
Feb 6, 2015, 7:39:23 PM2/6/15
to akka...@googlegroups.com
Hey,

the flatMap operator absence is a real pain. I'm actually migrating some Rx code that is basically full of flatMaps to Akka-streams and it is not easy to write custom operator that would do it with 2 days worth of experience.

Anybody willing to give me some hints or show some code?  Thanks, Jakub

Endre Varga

unread,
Feb 9, 2015, 2:54:27 AM2/9/15
to akka...@googlegroups.com
Hi,

There is a flatMap, it is just not called that. There is a flatten() method that takes a strategy, currently the only one being concat: flatten(FlattenStrategy.concat). The reason for that we don't provide a method called flatMap is to avoid it being overused in for comprehensions. In the land of backpressured, bounded memory stream processing flattening is an inherently dangerous operation that can lead to a deadlock.

-Endre

--

liska...@gmail.com

unread,
Feb 9, 2015, 4:10:27 AM2/9/15
to akka...@googlegroups.com
Hi,

yup, I missed that one can have FlowOps#map return Sources or that one can do : 

val myData: Source[List[Message]] = someDataSource
val flattened: Source[Message] = myData.mapConcat(identity)

Thank you, Jakub
Reply all
Reply to author
Forward
0 new messages