Grokking akka-stream and TcpStream

592 views
Skip to first unread message

Reid Spencer

unread,
Feb 12, 2015, 11:56:03 AM2/12/15
to akka...@googlegroups.com
hAkkers,

I've been unable to grok how to communicate with a TCP socket using akka-stream and StreamTcp extension.  At this point, I'm not sure the fault is entirely mine. :)  I'm building a MongoDB driver that uses Akka and I have it working well with akka.io. Mongo requires asynchronous reading and writing on a TCP socket. You can write requests to it as they happen and you read responses as they can be satisfied by the server. Requests and responses are matched with an ID number (i.e. each response indicates the request ID to which it responds). This seems to be an ideal candidate for akka-streams, at least on the surface. I'm now trying to transition my design to use akka-streams and StreamTcp. After several days of fumbling around, I'm still not able to grasp how to connect all the pieces. So, I'm hoping the group can help and that this might be instructive for users of akka-stream, or at least shine some light on needed documentation or features. 

Just to address the obligatory: 
  • I've read the akka-stream (1.0-M3) documentation, many times, every page. 
  • I've looked at the akka-stream code and discovered that without some sort of internal design document, much of it will be unintelligible because I don't have a conceptual model for how the pieces fit together (essentially a forest/trees issue).
  • I've read the (insufficient, IMO) API documentation. 
  • I've built, tried and studied the TcpEcho sample program. 
That sample TcpEcho program is the source of most of my misunderstanding as it is the only sample that relates to what I'm doing and I cannot extrapolate from it to do what I want to do.  Here's the program, from the documentation:

  • val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
  •  
  • val replParser = new PushStage[String, ByteString] {
  • override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
  • elem match {
  • case "q" ctx.pushAndFinish(ByteString("BYE\n"))
  • case _ ctx.push(ByteString(s"$elem\n"))
  • }
  • }
  • }
  •  
  • val repl = Flow[ByteString]
  • .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
  • .map(text => println("Server: " + text))
  • .map(_ => readLine("> "))
  • .transform(() replParser)
  •  
  • connection.handleWith(repl)

Here are the things that I find confusing:
  • The repl value is defined by invoking Flow[ByteString]. Now, I know the API well enough to know that Flow requires two type parameters: Flow[+In,-Out]. This is confusing because the Flow companion object's apply method takes only one type argument which it expands by duplicating. So Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I only note this because it took some digging around in the API before I understood how this worked and while it is handy, it is also not straight forward. 
  • The documentation implies that a Flow is uni-directional. It says a Flow "connects its up- and downstreams by transforming the data elements flowing through it." That, to me, says "unidirectional". The use of a Flow[ByteString,ByteString] for the repl value indicates to me that a uni-directional "transformation" from ByteString to ByteString is  occurring and yet this code implies that it is doing both reading and writing to the socket (i.e. it is bi-directional). How can that be? 
  • I see repl as a Flow that does this: takes a ByteString as input, chunks it into \n terminated lines up to 256 bytes, prints those lines out prefixed by "Server: " and then discards that input and replaces it with a line read from the console which is then output with a newline appended unless the input was "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, that's all great and it is all unidirectional writing data to the socket. So, now the questions:
    • Where is the reading from the server to get the original line(s) as input to the flow? I.e. where is the Source[ByteString]?
    • Assuming that connection.handleWith(repl)does some magic to set up the Source, how does the conversation get started? Is it assumed the server will send some data upon connection? The echo server example seems to have the same issue! 
    • FYI: OutgoingConnection.handleWith's documentation says this:
"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to the remoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
    • It would be nice if there was a little more description around "Handles the connection". Handles it how? Does it set up a Source and Sink? Are asynchronous bi-directional reading from the socket and writing to the socket implied?
  • Am I to infer from all this that the StreamTcp.outgoingConnection creates a Source[ByteString] from the socket's input and a Sink[ByteString] for the socket's output and that the flow provided to handleWith is run between these two? In other words, the OutgoingConnection can really only transform its input to its output? If so, then:
    • How is that generally applicable? 
    • This approach works fine for an echo client, but clearly there are protocols where the input and output can and should be processed independently, aren't there?
    • How would one do what Mongo needs and have an asynch flow of requests that is independent of an asynch flow of responses?
    • I noticed the Add Bi-directional Flow issue that is slated for inclusion in 1.0-M4. Is this intended for solving this issue where two related flows are paired to do bi-directional input/output? 
    • Am I just trying to implement my mongo driver before the required features are ready?
The documentation says, about this code: 

A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.

I would like a "resilient client" and I think leaving this part as an "exercise for the reader" is asking a bit much from the audience. We need an example of how to do this as it is likely the typical case not the exception (nobody needs another echo server/client). I suspect that the answer to my confusion lies in the information intended but not stated by this sentence from the documentation.

Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to split out the input reading because it is NOT obvious to me where this "input reading" is being done! If I used mapAsynch to obtain the request data from my driver's clients, it seems very obtuse to be setting up numerous Futures as opposed to just allowing them to give me a Source[Request] from which their requests are read and processed.

Any help you can provide to prevent me from drowning in these waters would be much appreciated!! 

Thanks,

Reid.

Eric Zoerner

unread,
Feb 12, 2015, 7:10:07 PM2/12/15
to akka...@googlegroups.com
Hi Reid,

I had similar difficulties as you in understanding how to make the input independent from the output, and I agree that there needs to be better documentation with a wider variety of examples. One thing I stumbled upon in the API that seemed to be the missing "key" for what I was trying to do, is the following apply method in Flow:

  1. defapply[IO](sink: Sink[I]source: Source[O])Flow[IO]

    Create a Flow from a seemingly disconnected Source and Sink pair.


    This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.

    -- Eric


Reid Spencer

unread,
Feb 13, 2015, 6:00:02 AM2/13/15
to akka...@googlegroups.com
Hi Eric,

Thanks for your response. It is a little comforting to know that I’m not the only one on this planet also struggling with how to do something real with akka-stream. I’m sure this thread will eventually lead to the clarity we both seek. 

On Feb 12, 2015, at 7:10 PM, Eric Zoerner <eric.z...@gmail.com> wrote:

Hi Reid,

I had similar difficulties as you in understanding how to make the input independent from the output, and I agree that there needs to be better documentation with a wider variety of examples. One thing I stumbled upon in the API that seemed to be the missing "key" for what I was trying to do, is the following apply method in Flow:

  1. defapply[IO](sink: Sink[I]source: Source[O])Flow[IO]

    Create a Flow from a seemingly disconnected Source and Sink pair.



Yes, I’ve seen this before but it led to my grumbling about the lack of an internal design document. That Flow.apply method creates a GraphBackedFlow. When I first saw that I was encouraged because I realized I would just need to understand GraphBackedFlow to understand what Flow.apply(Sink[I],Source[O]) was giving me. But, the documentation for that object only says “INTERNAL API”. I’m fine with that lack of documentation for things that I don’t use directly, but this GraphBackedFlow is something returned by the Flow companion and would be an instance of something I use in my program. It needs user level documentation. Either that or the Flow.apply method should be much more explicit about what is returned.  All it says is "Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.” It would be very much appreciated if there was at least a line or two of what this GraphBackedFlow represented. I get that it backs the flow with some sort of graph of objects but that really doesn’t give me much information about its operation. 

  1. This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.

I will have to dive into the Working With Graphs page in more detail. I’ve been giving it less attention because of this sentence:

Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.

I believe my needs are two linear “roads” not a network of junctions that fan-in and fan-out; but then again, I could be wrong. 

Akka Team: Please don’t take any of this commentary as derogatory; I’m quite enthusiastic about using akka-stream and StreamTcp.   I understand you are scrambling to get this released in a couple of months and there is much to do and document. In the spirit of cooperation, I will suggest that if you can point Eric and I in the right direction (e.g. how to write an asynchronous Tcp protocol), then I will volunteer to add sample code and documentation to Akka that explains this part (once I understand it, of course!) :) 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Endre Varga

unread,
Feb 13, 2015, 6:40:35 AM2/13/15
to akka...@googlegroups.com
Hi Reid,

On Thu, Feb 12, 2015 at 5:56 PM, Reid Spencer <re...@reactific.com> wrote:
hAkkers,

I've been unable to grok how to communicate with a TCP socket using akka-stream and StreamTcp extension.  At this point, I'm not sure the fault is entirely mine. :)

I can understand that many of the concepts might be alien at first. In our defense, it is always hard to see what might be unclear for newcomers, since we build the stuff and know it inside-out :) Your feedback is very valuable in this regard.
  

  • The repl value is defined by invoking Flow[ByteString]. Now, I know the API well enough to know that Flow requires two type parameters: Flow[+In,-Out]. This is confusing because the Flow companion object's apply method takes only one type argument which it expands by duplicating. So Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I only note this because it took some digging around in the API before I understood how this worked and while it is handy, it is also not straight forward. 

This might need to be explained in the docs yes. But the logic is simple: when you create a Flow by Flow[T] then what you get is an empty Flow that just does nothing to the input, it passes down it unmodified (since the transformation pipeline is empty). Obviously this means that you cannot say Flow[Int, String] for an empty flow, since Int will never become a String if there is no processing step there. Why this setup is needed is that every step on Flow gives you back a new flow. So:

 val doNothing: Flow[Int, Int] = Flow[Int]
 val gimmeString: Flow[Int, String] = doNothing.map(_.toString)

The doNothing flow must have its two type parameters equal, because that Flow just does not transform anything. On the other hand gimmeString has a different type signature since a map stage is appended to the empty Flow.

Flow is simply a data structure for representing a set of transformations (usually in a linear fashion but not necessarily)

  • The documentation implies that a Flow is uni-directional. It says a Flow "connects its up- and downstreams by transforming the data elements flowing through it." That, to me, says "unidirectional". The use of a Flow[ByteString,ByteString] for the repl value indicates to me that a uni-directional "transformation" from ByteString to ByteString is  occurring and yet this code implies that it is doing both reading and writing to the socket (i.e. it is bi-directional). How can that be? 
Well, the doc is a bit imprecise. Basically a Flow is a description of a transformation that have exactly one input and output port. In *most* of the cases this is simply a unidirectional flow of elements coming from the input and passed down through output, but there are exceptions to this rule. The docs simplified this issue and just refers to it as a unidirectional element.

Now in the TCP example it *is* however unidirectional, since what it looks like is:

   
  +->(inbytes)-->+
  |              |
[TCP]       [HandlerFlow]
  |              |
  +<-(outbytes)<-+

As you see the "HandlerFlow" is really just taking input bytes and transforms them to output bytes. 

  • I see repl as a Flow that does this: takes a ByteString as input, chunks it into \n terminated lines up to 256 bytes, prints those lines out prefixed by "Server: " and then discards that input and replaces it with a line read from the console which is then output with a newline appended unless the input was "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, that's all great and it is all unidirectional writing data to the socket. So, now the questions:
    • Where is the reading from the server to get the original line(s) as input to the flow? I.e. where is the Source[ByteString]?
The source of confusion is that "handleWith" convenience method that we added exactly to make things simpler. But what happens is actually:

def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) =
   flow.join(handler).run()

Now you may rightly ask what is that flow there, well, OutgoingConnection are nothing but a wrapper around a Flow[ByteString, ByteString], which you can get via OutgoingConnection::flow.  That's right, a TCP connection is not a Sink and a Source, but a Flow! (remember Flow is something that have exactly one input and output port). Why is it a Flow and not a Sink+Source? Simply because it is many times more convenient in client setups:

sourceOfRequests -> EncodePackets -> tcpFlowRepresentingRemoteService -> DecodePackets -> sinkOfResponses

I.e. a TCP connection becomes a transformation step. 

Now the only remaining mystery is flow.join(handler). This simply wires together two flows by making a cycle: connecting flow1's output to flow2's input, and connecting flow2's output to flow1's input.

 
    • Assuming that connection.handleWith(repl)does some magic to set up the Source, how does the conversation get started? Is it assumed the server will send some data upon connection? The echo server example seems to have the same issue! 
Echo server does not need to start the conversation, because it just echoes back whatever it has written. I.e. the echo "service flow" that you attach to the TCP flow of the incoming connection just transforms each incoming bytes to outgoing bytes without modification. If you need the server side to start the conversation you can use the concatenation operators to send an initial sequence of elements before sending anything other.
 
"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to the remoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
    • It would be nice if there was a little more description around "Handles the connection". Handles it how? Does it set up a Source and Sink? Are asynchronous bi-directional reading from the socket and writing to the socket implied?

The definition is the code snipped I pasted above: it wires the Flow's input part to the incoming bytes port of TCP flow, and wires the Flow's output port to the outgoing bytes port of TCP.
 
  • Am I to infer from all this that the StreamTcp.outgoingConnection creates a Source[ByteString] from the socket's input and a Sink[ByteString] for the socket's output and that the flow provided to handleWith is run between these two? In other words, the OutgoingConnection can really only transform its input to its output? If so, then:
Yes, on a conceptual level this is true.
 
    • How is that generally applicable? 
Every Server/Client is just a transformation between inputs and outputs which is expressed as a protocol. The missing piece you are probably after are graphs. A Flow (which, as I said is something with exactly one inpurt and output port) can host actually a graph with complex routing in it, calls to other parts of the application via mapAsync, or sending off messages to actors.

 
    • This approach works fine for an echo client, but clearly there are protocols where the input and output can and should be processed independently, aren't there?
Yes, even Http is such protocol, even though it looks like a simple request/reply protocol (but it is more horrible than that).

 
    • How would one do what Mongo needs and have an asynch flow of requests that is independent of an asynch flow of responses?
This is too generic to answer. If you can decompose your problem we can help with some of the subproblems.

 
    • I noticed the Add Bi-directional Flow issue that is slated for inclusion in 1.0-M4. Is this intended for solving this issue where two related flows are paired to do bi-directional input/output? 
Yes and no. It will basically introduce a new DSL element that has exactly *two* input and *two* output ports. This is rather handy when working with networking. SSL will be such a bidi-stage.
 
    • Am I just trying to implement my mongo driver before the required features are ready?

I recommend to wait for M4, we have some rewrite in progress. It will make though many of the concepts simpler.
 
The documentation says, about this code: 

A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.

I would like a "resilient client" and I think leaving this part as an "exercise for the reader" is asking a bit much from the audience. We need an example of how to do this as it is likely the typical case not the exception (nobody needs another echo server/client). I suspect that the answer to my confusion lies in the information intended but not stated by this sentence from the documentation.

The input reading here refers to reading from the console, i.e. it expresses that simply using a map there is not optimal, we just didn't want to complicate the example. For example you can have an actor that actually does the console reading, and then use mapAsync together with the ask pattern to communicate with that actor from within the stream. There will be other options to factor out code and error handling. Please be patient about this :)
 

Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to split out the input reading because it is NOT obvious to me where this "input reading" is being done!

As I mentioned, this refers to the reading from console (i.e. user input). Reading from the socket is behind the curtains, the chain of transformations will grab the next TCP bytes as soon as they are ready for processing again (for example a map finished mapping the previous element and could pass it to the next stage)
 
If I used mapAsynch to obtain the request data from my driver's clients, it seems very obtuse to be setting up numerous Futures as opposed to just allowing them to give me a Source[Request] from which their requests are read and processed.

I am not sure I got this. If you have actual small sized subproblems then we can help.

-Endre
 

Any help you can provide to prevent me from drowning in these waters would be much appreciated!! 

Thanks,

Reid.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Endre Varga

unread,
Feb 13, 2015, 6:51:48 AM2/13/15
to akka...@googlegroups.com
Hi Reid,

On Fri, Feb 13, 2015 at 11:59 AM, Reid Spencer <re...@reactific.com> wrote:
Hi Eric,

Thanks for your response. It is a little comforting to know that I’m not the only one on this planet also struggling with how to do something real with akka-stream. I’m sure this thread will eventually lead to the clarity we both seek. 

I probably need to add that one of the beginner issues is to try to do *everything* with streams. Currently this is doomed to fail, and even in the future it will be not possible in all cases. Plain old actors are always there to use, so use streams where they really shine already, right now: for example a chain of decoding/encoding steps, simple transformations that can be expressed as an ask to an actor (you can use it together with mapAsync and have your actor magically backpressured!). Also, graphs are almost always needed for protocols.
 

Yes, I’ve seen this before but it led to my grumbling about the lack of an internal design document. That Flow.apply method creates a GraphBackedFlow.

These will be gone in M4. A Flow is just a collection of processing steps (any graph layout) with exactly one input and output port that is not wired to anything.
 
When I first saw that I was encouraged because I realized I would just need to understand GraphBackedFlow to understand what Flow.apply(Sink[I],Source[O]) was giving me. But, the documentation for that object only says “INTERNAL API”. I’m fine with that lack of documentation for things that I don’t use directly, but this GraphBackedFlow is something returned by the Flow companion and would be an instance of something I use in my program. It needs user level documentation. Either that or the Flow.apply method should be much more explicit about what is returned.  All it says is "Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.” It would be very much appreciated if there was at least a line or two of what this GraphBackedFlow represented. I get that it backs the flow with some sort of graph of objects but that really doesn’t give me much information about its operation. 

These things will go away with M4, see explanation below.
 

  1. This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.

I will have to dive into the Working With Graphs page in more detail. I’ve been giving it less attention because of this sentence:

Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.

Well, I think that sentence really does not convey the importance of graphs. It is very likely that you will need graphs for more complex application. 

So, just to give you a taste of M4:

 - Internally, everything is just a "Module" which has input and output ports
 - An atomic Module is directly translated to an executable entity
 - A composite module consists of atomic modules and possible other composite modues, with some ouput ports connected to some input ports
 - you can hierarchically nest these modules
 - A Flow is *really* nothing more than just a DSL shim over a module that has exactly one input and output port, but has arbitrarily complex nested graph layout inside
 - Same goes for Source and Sink

Now you can start imagining your application as a hieararchic setup of "boxes" (modules) that represent services, and has input and output ports. You compose simpler services to more complex ones and wrap them in another box, until you have a nice nested hierarchy.

And that's it :)
 

I believe my needs are two linear “roads” not a network of junctions that fan-in and fan-out; but then again, I could be wrong. 

This is hard to know. I don't know the exact hassles you need.
 

Akka Team: Please don’t take any of this commentary as derogatory; I’m quite enthusiastic about using akka-stream and StreamTcp.   I understand you are scrambling to get this released in a couple of months and there is much to do and document. In the spirit of cooperation, I will suggest that if you can point Eric and I in the right direction (e.g. how to write an asynchronous Tcp protocol), then I will volunteer to add sample code and documentation to Akka that explains this part (once I understand it, of course!) :) 

Well, just give a very dummy-sized example problem that you find enough to start (please keep it minimal, we are short in resources) and I can attempt to give some pointers.

-Endre
 
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Reid Spencer

unread,
Feb 13, 2015, 7:41:08 AM2/13/15
to akka...@googlegroups.com
Hi Endre,

Thanks for the quick response. My thoughts inline, below ...

On Feb 13, 2015, at 6:40 AM, Endre Varga <endre...@typesafe.com> wrote:

Hi Reid,

On Thu, Feb 12, 2015 at 5:56 PM, Reid Spencer <re...@reactific.com> wrote:
hAkkers,

I've been unable to grok how to communicate with a TCP socket using akka-stream and StreamTcp extension.  At this point, I'm not sure the fault is entirely mine. :)

I can understand that many of the concepts might be alien at first. In our defense, it is always hard to see what might be unclear for newcomers, since we build the stuff and know it inside-out :) Your feedback is very valuable in this regard.

I’ve been on your side of things many times and so I’m not bashful about telling you where I’m confused. Hopefully it leads to better design/doc for everyone in the long run.

  • The repl value is defined by invoking Flow[ByteString]. Now, I know the API well enough to know that Flow requires two type parameters: Flow[+In,-Out]. This is confusing because the Flow companion object's apply method takes only one type argument which it expands by duplicating. So Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I only note this because it took some digging around in the API before I understood how this worked and while it is handy, it is also not straight forward. 

This might need to be explained in the docs yes. But the logic is simple: when you create a Flow by Flow[T] then what you get is an empty Flow that just does nothing to the input, it passes down it unmodified (since the transformation pipeline is empty). Obviously this means that you cannot say Flow[Int, String] for an empty flow, since Int will never become a String if there is no processing step there. Why this setup is needed is that every step on Flow gives you back a new flow. So:

 val doNothing: Flow[Int, Int] = Flow[Int]
 val gimmeString: Flow[Int, String] = doNothing.map(_.toString)

The doNothing flow must have its two type parameters equal, because that Flow just does not transform anything. On the other hand gimmeString has a different type signature since a map stage is appended to the empty Flow.

Thank you for that. I completely missed the “transform nothing” aspect of this apply function. It would be useful to mention that this is where to start in building a flow or even flow graph. I see that is just exactly how it is used in the example but the simplicity of that was lost on me. 


Flow is simply a data structure for representing a set of transformations (usually in a linear fashion but not necessarily)

  • The documentation implies that a Flow is uni-directional. It says a Flow "connects its up- and downstreams by transforming the data elements flowing through it." That, to me, says "unidirectional". The use of a Flow[ByteString,ByteString] for the repl value indicates to me that a uni-directional "transformation" from ByteString to ByteString is  occurring and yet this code implies that it is doing both reading and writing to the socket (i.e. it is bi-directional). How can that be? 
Well, the doc is a bit imprecise. Basically a Flow is a description of a transformation that have exactly one input and output port. In *most* of the cases this is simply a unidirectional flow of elements coming from the input and passed down through output, but there are exceptions to this rule. The docs simplified this issue and just refers to it as a unidirectional element.

Now in the TCP example it *is* however unidirectional, since what it looks like is:

   
  +->(inbytes)-->+
  |              |
[TCP]       [HandlerFlow]
  |              |
  +<-(outbytes)<-+

As you see the "HandlerFlow" is really just taking input bytes and transforms them to output bytes. 

Ah hah! :)

That little diagram needs to make its way to the documentation, please :)


  • I see repl as a Flow that does this: takes a ByteString as input, chunks it into \n terminated lines up to 256 bytes, prints those lines out prefixed by "Server: " and then discards that input and replaces it with a line read from the console which is then output with a newline appended unless the input was "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, that's all great and it is all unidirectional writing data to the socket. So, now the questions:
    • Where is the reading from the server to get the original line(s) as input to the flow? I.e. where is the Source[ByteString]?
The source of confusion is that "handleWith" convenience method that we added exactly to make things simpler. But what happens is actually:

def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) =
   flow.join(handler).run()

Yes, I saw this in the code and thought I understood it as joining the handler to the flow, but ….


Now you may rightly ask what is that flow there, well, OutgoingConnection are nothing but a wrapper around a Flow[ByteString, ByteString], which you can get via OutgoingConnection::flow.  That's right, a TCP connection is not a Sink and a Source, but a Flow! (remember Flow is something that have exactly one input and output port). Why is it a Flow and not a Sink+Source? Simply because it is many times more convenient in client setups:

sourceOfRequests -> EncodePackets -> tcpFlowRepresentingRemoteService -> DecodePackets -> sinkOfResponses

I.e. a TCP connection becomes a transformation step. 

I can see, now, why you think of it this way. I think I need to stop thinking of Flow as analogous to a Unix pipe. It really is what the doc says: a transformation between some input and some output even if that’s in a cycle or graph, not just a uni-directional linear flow.


Now the only remaining mystery is flow.join(handler). This simply wires together two flows by making a cycle: connecting flow1's output to flow2's input, and connecting flow2's output to flow1's input.

That part was non-obvious. I didn’t realize the cyclic nature of “join” here. 

 
    • Assuming that connection.handleWith(repl)does some magic to set up the Source, how does the conversation get started? Is it assumed the server will send some data upon connection? The echo server example seems to have the same issue! 
Echo server does not need to start the conversation, because it just echoes back whatever it has written. I.e. the echo "service flow" that you attach to the TCP flow of the incoming connection just transforms each incoming bytes to outgoing bytes without modification. If you need the server side to start the conversation you can use the concatenation operators to send an initial sequence of elements before sending anything other.

Okay, fair enough. I can see now that echo service is likely one of the simplest use cases for Flows as it just does a simple turnaround in the “HandlerFlow"

"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to the remoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
    • It would be nice if there was a little more description around "Handles the connection". Handles it how? Does it set up a Source and Sink? Are asynchronous bi-directional reading from the socket and writing to the socket implied?

The definition is the code snipped I pasted above: it wires the Flow's input part to the incoming bytes port of TCP flow, and wires the Flow's output port to the outgoing bytes port of TCP.
 
  • Am I to infer from all this that the StreamTcp.outgoingConnection creates a Source[ByteString] from the socket's input and a Sink[ByteString] for the socket's output and that the flow provided to handleWith is run between these two? In other words, the OutgoingConnection can really only transform its input to its output? If so, then:
Yes, on a conceptual level this is true.
 
    • How is that generally applicable? 
Every Server/Client is just a transformation between inputs and outputs which is expressed as a protocol.

I suspected this but it didn’t seem intuitive to me. I suppose this is true at some level of abstraction, it just isn’t the way I’m used to thinking about it or dealing with I/O (at a much lower level). Not seeing how the socket is read and written, using a Flow via StreamTcp seems a bit like “smoke and mirrors” (magic) to me. Perhaps this is the mental shift I need to make to use StreamTcp.  

The missing piece you are probably after are graphs. A Flow (which, as I said is something with exactly one inpurt and output port) can host actually a graph with complex routing in it, calls to other parts of the application via mapAsync, or sending off messages to actors.

Yes, Eric’s response to my message gave me a clue that graphs are what is needed. I’m going to have to study that part of the documentation in more detail and figure out how to build the necessary graph to do the kind of asynchronous request/response cycles I’m used to thinking about. 

    • This approach works fine for an echo client, but clearly there are protocols where the input and output can and should be processed independently, aren't there?
Yes, even Http is such protocol, even though it looks like a simple request/reply protocol (but it is more horrible than that).

Yes, unfortunately :) 
    • How would one do what Mongo needs and have an asynch flow of requests that is independent of an asynch flow of responses?
This is too generic to answer. If you can decompose your problem we can help with some of the subproblems.

Yes, I wasn’t asking for help with Mongo specifically, but just how to separate the input and output when dealing with a Flow. However, you’ve confirmed that graphs and mapAsync are tools for making “calls to other parts of the application” so I’m going to work on that over the weekend and see if I can’t get all the pieces working. If a smaller subproblem falls out of that, you’ll hear from me. OTOH, if I get to some level of success than I’ll be in a better position to contribute an example of how to use TcpStream in a bit more complicated example than the echo client.

    • I noticed the Add Bi-directional Flow issue that is slated for inclusion in 1.0-M4. Is this intended for solving this issue where two related flows are paired to do bi-directional input/output? 
Yes and no. It will basically introduce a new DSL element that has exactly *two* input and *two* output ports. This is rather handy when working with networking. SSL will be such a bidi-stage.
 
    • Am I just trying to implement my mongo driver before the required features are ready?

I recommend to wait for M4, we have some rewrite in progress. It will make though many of the concepts simpler.

Okay. Simplicity is good at this point :) Perhaps I’ll just keep building on top of my Akka IO solution for now and re-attempt the conversion to akka-stream when M4 is available.  I imagine M4 is some weeks away? Do you have an ETA?

 
The documentation says, about this code: 

A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.

I would like a "resilient client" and I think leaving this part as an "exercise for the reader" is asking a bit much from the audience. We need an example of how to do this as it is likely the typical case not the exception (nobody needs another echo server/client). I suspect that the answer to my confusion lies in the information intended but not stated by this sentence from the documentation.

The input reading here refers to reading from the console, i.e. it expresses that simply using a map there is not optimal, we just didn't want to complicate the example.

Okay, I had assumed it meant reading the input from the socket. 

For example you can have an actor that actually does the console reading, and then use mapAsync together with the ask pattern to communicate with that actor from within the stream. There will be other options to factor out code and error handling. Please be patient about this :)

As I said above, I’m going to have to look into more details about graphs and mapAsync. I already realized that “akka-streams alone” won’t be enough and have my use of it wrapped in an actor.  Glad to hear there will be other options.  May I assume that the M4 changes will leave the Flow API relatively unchanged? That is, mapAsync will still be there and function the same way? I’d hate to invest a lot of time in stuff that’s going to be tossed. 

 

Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to split out the input reading because it is NOT obvious to me where this "input reading" is being done!

As I mentioned, this refers to the reading from console (i.e. user input). Reading from the socket is behind the curtains, the chain of transformations will grab the next TCP bytes as soon as they are ready for processing again (for example a map finished mapping the previous element and could pass it to the next stage)

Got it. Thank you. 

 
If I used mapAsynch to obtain the request data from my driver's clients, it seems very obtuse to be setting up numerous Futures as opposed to just allowing them to give me a Source[Request] from which their requests are read and processed.

I am not sure I got this. If you have actual small sized subproblems then we can help.

I think it is a reflection of my misunderstanding about the nature of a Flow. What I considered obtuse may be quite natural in Flowville :)  I’m going to have to learn more about graphs and integrating actors into a Flow before continuing to comment. If a “small sized” subproblem drops out, I’ll be sure to let you know.

Thanks much for your help, Endre. It is truly appreciated. 


-Endre
 

Any help you can provide to prevent me from drowning in these waters would be much appreciated!! 

Thanks,

Reid.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Reid Spencer

unread,
Feb 13, 2015, 8:08:08 AM2/13/15
to akka...@googlegroups.com
On Feb 13, 2015, at 6:51 AM, Endre Varga <endre...@typesafe.com> wrote:

Hi Reid,

On Fri, Feb 13, 2015 at 11:59 AM, Reid Spencer <re...@reactific.com> wrote:
Hi Eric,

Thanks for your response. It is a little comforting to know that I’m not the only one on this planet also struggling with how to do something real with akka-stream. I’m sure this thread will eventually lead to the clarity we both seek. 

I probably need to add that one of the beginner issues is to try to do *everything* with streams. Currently this is doomed to fail, and even in the future it will be not possible in all cases. Plain old actors are always there to use, so use streams where they really shine already, right now: for example a chain of decoding/encoding steps, simple transformations that can be expressed as an ask to an actor (you can use it together with mapAsync and have your actor magically backpressured!). Also, graphs are almost always needed for protocols.

Yes, this is exactly the case I wanted to use it for. I have higher level actors to deal with all manner of things but wanted to replace my Akka IO implementation of the socket encode/decode cycle with TcpStream. I thought it would be easy but got myself a bit confused. Your last email went a long way to pointing me in the right direction. 

 

Yes, I’ve seen this before but it led to my grumbling about the lack of an internal design document. That Flow.apply method creates a GraphBackedFlow.

These will be gone in M4. A Flow is just a collection of processing steps (any graph layout) with exactly one input and output port that is not wired to anything.

Okay, just to be clear, GraphBackedFlow will be gone as will the Flow.apply method that creates them ?

 
When I first saw that I was encouraged because I realized I would just need to understand GraphBackedFlow to understand what Flow.apply(Sink[I],Source[O]) was giving me. But, the documentation for that object only says “INTERNAL API”. I’m fine with that lack of documentation for things that I don’t use directly, but this GraphBackedFlow is something returned by the Flow companion and would be an instance of something I use in my program. It needs user level documentation. Either that or the Flow.apply method should be much more explicit about what is returned.  All it says is "Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.” It would be very much appreciated if there was at least a line or two of what this GraphBackedFlow represented. I get that it backs the flow with some sort of graph of objects but that really doesn’t give me much information about its operation. 

These things will go away with M4, see explanation below.
 

  1. This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.

I will have to dive into the Working With Graphs page in more detail. I’ve been giving it less attention because of this sentence:

Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.

Well, I think that sentence really does not convey the importance of graphs. It is very likely that you will need graphs for more complex application. 

I’m getting how important they are. Perhaps a documentation change that indicates their importance up front would help clue readers like me into the fact that I needed to pay more attention to that portion of the documentation. 


So, just to give you a taste of M4:

 - Internally, everything is just a "Module" which has input and output ports
 - An atomic Module is directly translated to an executable entity
 - A composite module consists of atomic modules and possible other composite modues, with some ouput ports connected to some input ports
 - you can hierarchically nest these modules
 - A Flow is *really* nothing more than just a DSL shim over a module that has exactly one input and output port, but has arbitrarily complex nested graph layout inside
 - Same goes for Source and Sink

Okay, can I just suggest that “Module” might be a poor choice for a name as it is already highly overloaded in lot of other contexts. Node wouldn’t be much better but choosing good names is a significant aide to comprehension of these concepts. 


Now you can start imagining your application as a hieararchic setup of "boxes" (modules) that represent services, and has input and output ports. You compose simpler services to more complex ones and wrap them in another box, until you have a nice nested hierarchy.

And that's it :)

Yes, that’s how I do think of my application and I’m looking forward to expressing it directly with akka-stream.

I believe my needs are two linear “roads” not a network of junctions that fan-in and fan-out; but then again, I could be wrong. 

This is hard to know. I don't know the exact hassles you need.

Your explanations have helped me realize that my thinking was too linear and based on keeping input and output separate. I’m going to try to drop that mental construct and look at the problem from the perspective of a graph that manages the request/response “loop” that StreamTcp provides. 

Akka Team: Please don’t take any of this commentary as derogatory; I’m quite enthusiastic about using akka-stream and StreamTcp.   I understand you are scrambling to get this released in a couple of months and there is much to do and document. In the spirit of cooperation, I will suggest that if you can point Eric and I in the right direction (e.g. how to write an asynchronous Tcp protocol), then I will volunteer to add sample code and documentation to Akka that explains this part (once I understand it, of course!) :) 

Well, just give a very dummy-sized example problem that you find enough to start (please keep it minimal, we are short in resources) and I can attempt to give some pointers.

I’ll see what I can come up with in the next week or so. Hopefully my questions from here on out get more focused on specific issues.

Thanks again, Endre. 

Endre Varga

unread,
Feb 13, 2015, 8:12:55 AM2/13/15
to akka...@googlegroups.com
Hi Reid,
yout) with exactly one input and output port that is not wired to anything.

Okay, just to be clear, GraphBackedFlow will be gone as will the Flow.apply method that creates them ?

GraphBackedFlow will be gone, but the functionality will be there, so you will still be able to create a Flow from a graph. The difference is that internally we won't special case it anymore.
 


Okay, can I just suggest that “Module” might be a poor choice for a name as it is already highly overloaded in lot of other contexts. Node wouldn’t be much better but choosing good names is a significant aide to comprehension of these concepts. 

Module is an internal name, so you won't see it as a user. What you will observe is a better unification of "boxes with ports" in every layer.


-Endre

Roland Kuhn

unread,
Feb 13, 2015, 12:06:46 PM2/13/15
to akka-user
Thanks, Reid, Eric and Endre, this conversation is immensely helpful! Reid, you offered to suggest concrete improvements in PR form and that would of course be totally awesome, thanks in advance!

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Reid Spencer

unread,
Feb 13, 2015, 12:46:23 PM2/13/15
to akka...@googlegroups.com
Roland,

Yes, I have committed to adding some value around Akka Logging as well as providing some documentation and sample code around using StreamTcp (once I have it mastered!). Happy to help Akka project as it provides so much for me. 

Best,

Reid.
Reply all
Reply to author
Forward
0 new messages