Flatmap for Flows

491 views
Skip to first unread message

Magnus Andersson

unread,
Dec 6, 2014, 10:23:56 AM12/6/14
to akka...@googlegroups.com
Hi

I have a flow that I want to conditionally branch off to either one of two other flows. Basically I'm looking to "flatMap that shit", but how do I do this in the context of Akka streams?

In my current attempt the types will not line up in the flatten step, I get the compile error:

polymorphic expression cannot be instantiated to expected type;
[error]  found   : [T]akka.stream.FlattenStrategy[akka.stream.scaladsl.Source[T],T]
[error]  required: akka.stream.FlattenStrategy[akka.stream.scaladsl.Flow[akka.http.model.HttpRequest,akka.http.model.HttpResponse],?]
[error]       }.flatten( FlattenStrategy.concat )
Relevant code:

val serverBinding = Http( systemRef ).
  bind(
    interface = interface,
    port = port )

val con: OutgoingConnection = Http( systemRef ).outgoingConnection( "akka.io" )

val badRequestFlow: Flow[HttpRequest, HttpResponse] =
  Flow[HttpRequest].map( _ ⇒ HttpResponse( BadRequest, entity = "Bad Request" ) )

val proxy: Flow[HttpRequest, HttpResponse] = con.flow.concat( Source( () ⇒ List( HttpRequest( GET, uri = "/" ) ).iterator ) )

val proxyRequest: Flow[HttpRequest, HttpResponse] =
  Flow[HttpRequest].map {
    case HttpRequest( POST, Uri.Path( "/proxy" ), _, _, _ ) ⇒ proxy
    case other ⇒ badRequestFlow
  }.flatten( FlattenStrategy.concat )
Any hints or examples warmly welcome! 
I'm also wondering about supervision of flows (for error handling) but I will put that in a separate post.

/Magnus

Eric Nelson

unread,
Dec 6, 2014, 6:28:52 PM12/6/14
to akka...@googlegroups.com
I'm no expert (still getting into Akka streams myself). but I would look at your line

val badRequestFlow: Flow[HttpRequest, HttpResponse] =
 
Flow[HttpRequest].map( _ HttpResponse( BadRequest, entity = "Bad Request" ) )

Your flow as defined by 'Flow[HttpRequest]' expects the output to be HttpRequest, but your mapping causes it to be HttpResponse. I would try

val badRequestFlow: Flow[HttpRequest, HttpResponse] =

 
Flow[HttpRequest, HttpResponse].map( _ HttpResponse( BadRequest, entity = "Bad Request" ) )

--Eric

Roland Kuhn

unread,
Dec 7, 2014, 11:44:33 AM12/7/14
to akka-user
Hi Magnus,

the error message is telling you that the `concat` strategy can only flatten Sources, not Flows. The `map` combinator in your example just emits Flows but does not connect them to the input that you want to pipe through them; in the generic, dynamic sense we do not support this currently, you could use groupBy to achieve this effect. But your use-case is more static anyway, it should be a perfect fit for using the FlexiRoute junction (see this test suite for inspiration).

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Magnus Andersson

unread,
Dec 8, 2014, 3:16:30 AM12/8/14
to akka...@googlegroups.com
Hi

Thanks guys. I actually noticed flexiroute a bit after I had submitted my post, but did not make a deep dive.

Will definitely have another look. Thanks for the clarification on flatten.

Magnus

Magnus Andersson

unread,
Dec 9, 2014, 5:25:18 PM12/9/14
to akka...@googlegroups.com
Hi all

So I gave FlexiRoute another try. I kind-of-sort-of got it working after an evening of tinkering.

I have two questions.
  1. Is there anything that I have done in an unnecessarily complex fashion in my implementation (gist below)? I feel like I am coding in Java and implementing the RequestProcessorFactoryFactoryVisitorObserver-pattern (exaggerating a tiny bit) ;-)
  2. I have a bug in my code, I suspect it has to do with my FlexiRoute implementation. Can anyone help me figure out the (see below)?
Here is a self contained example that does a reverse proxy to Google search on the URL http://127.0.0.1:7080/proxy

When I run my code in the gist. I get the output for a remote connection even if I go through the failure path (ie. accessing http://127.0.0.1:7080/wrong). I tried printing some debug output and it seems that the stream does not take the success path, but still the connection is established. Logs further down.

I suspect this issue is related to the routing implementation or if it can be that the materialization of the flow triggers the connection? I don't really want connections to be established unless I'm actually visiting that branch.

Hope some poor soul have patience to read through the code and help me understand. :)
/Magnus

service-auth [DEBUG] [12/09/2014 22:55:35.629] [main] [EventStream(akka://ProxySystem)] logger log1-Logging$DefaultLogger started
service-auth [DEBUG] [12/09/2014 22:55:35.631] [main] [EventStream(akka://ProxySystem)] Default Loggers started
service-auth [DEBUG] [12/09/2014 22:55:36.174] [ProxySystem-akka.actor.default-dispatcher-3] [akka://ProxySystem/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:7080
service-auth [DEBUG] [12/09/2014 22:55:46.256] [ProxySystem-akka.actor.default-dispatcher-3] [akka://ProxySystem/system/IO-TCP/selectors/$a/0] New connection accepted
service-auth [DEBUG] [12/09/2014 22:55:46.551] [ProxySystem-akka.actor.default-dispatcher-2] [akka://ProxySystem/system/IO-TCP/selectors/$a/2] Attempting connection to [www.google.com/173.194.71.106:80]
service-auth [DEBUG] [12/09/2014 22:55:46.559] [ProxySystem-akka.actor.default-dispatcher-2] [akka://ProxySystem/system/IO-TCP/selectors/$a/2] Connection established to [www.google.com/173.194.71.106:80]
service-auth [DEBUG] [12/09/2014 22:55:51.908] [ProxySystem-akka.actor.default-dispatcher-10] [akka://ProxySystem/user/$a/flow-17-1-prefixAndTail] Cancelling akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@43d908c (after: 5000 ms)
service-auth [INFO] [12/09/2014 22:55:51.913] [ProxySystem-akka.actor.default-dispatcher-12] [akka://ProxySystem/user/$a/flow-17-1-prefixAndTail] Message [akka.stream.actor.ActorSubscriberMessage$OnComplete$] from Actor[akka://ProxySystem/deadLetters] to Actor[akka://ProxySystem/user/$a/flow-17-1-prefixAndTail#1089667920] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Roland Kuhn

unread,
Feb 28, 2015, 5:22:10 AM2/28/15
to akka-user
FYI: I filed https://github.com/akka/akka/issues/16965 for this, which should enable you to easily route elements to either of a set of flows that you then merge together afterwards. That can then also be used to implement a new (custom) combinator like

def conditional[I, O](p: I => Boolean, whenTrue: Flow[I, O, _], whenFalse: Flow[I, O, _]): Flow[I, O, Unit]

(the implementation of which would use the Flow() factory and the junction I mentioned). The resulting flows can then easily be used in `.via(...)` clauses.

Regards,

Roland

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages