- val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
- val replParser = new PushStage[String, ByteString] {
- override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
- elem match {
- case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n"))
- case _ ⇒ ctx.push(ByteString(s"$elem\n"))
- }
- }
- }
- val repl = Flow[ByteString]
- .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
- .map(text => println("Server: " + text))
- .map(_ => readLine("> "))
- .transform(() ⇒ replParser)
- connection.handleWith(repl)
"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to theremoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.
Create a Flow from a seemingly disconnected Source and Sink pair.
On Feb 12, 2015, at 7:10 PM, Eric Zoerner <eric.z...@gmail.com> wrote:Hi Reid,I had similar difficulties as you in understanding how to make the input independent from the output, and I agree that there needs to be better documentation with a wider variety of examples. One thing I stumbled upon in the API that seemed to be the missing "key" for what I was trying to do, is the following apply method in Flow:
This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.
Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
hAkkers,I've been unable to grok how to communicate with a TCP socket using akka-stream and StreamTcp extension. At this point, I'm not sure the fault is entirely mine. :)
- The repl value is defined by invoking Flow[ByteString]. Now, I know the API well enough to know that Flow requires two type parameters: Flow[+In,-Out]. This is confusing because the Flow companion object's apply method takes only one type argument which it expands by duplicating. So Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I only note this because it took some digging around in the API before I understood how this worked and while it is handy, it is also not straight forward.
- The documentation implies that a Flow is uni-directional. It says a Flow "connects its up- and downstreams by transforming the data elements flowing through it." That, to me, says "unidirectional". The use of a Flow[ByteString,ByteString] for the repl value indicates to me that a uni-directional "transformation" from ByteString to ByteString is occurring and yet this code implies that it is doing both reading and writing to the socket (i.e. it is bi-directional). How can that be?
- I see repl as a Flow that does this: takes a ByteString as input, chunks it into \n terminated lines up to 256 bytes, prints those lines out prefixed by "Server: " and then discards that input and replaces it with a line read from the console which is then output with a newline appended unless the input was "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, that's all great and it is all unidirectional writing data to the socket. So, now the questions:
- Where is the reading from the server to get the original line(s) as input to the flow? I.e. where is the Source[ByteString]?
- Assuming that connection.handleWith(repl)does some magic to set up the Source, how does the conversation get started? Is it assumed the server will send some data upon connection? The echo server example seems to have the same issue!
- FYI: OutgoingConnection.handleWith's documentation says this:
"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to theremoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
- It would be nice if there was a little more description around "Handles the connection". Handles it how? Does it set up a Source and Sink? Are asynchronous bi-directional reading from the socket and writing to the socket implied?
- Am I to infer from all this that the StreamTcp.outgoingConnection creates a Source[ByteString] from the socket's input and a Sink[ByteString] for the socket's output and that the flow provided to handleWith is run between these two? In other words, the OutgoingConnection can really only transform its input to its output? If so, then:
- How is that generally applicable?
- This approach works fine for an echo client, but clearly there are protocols where the input and output can and should be processed independently, aren't there?
- How would one do what Mongo needs and have an asynch flow of requests that is independent of an asynch flow of responses?
- I noticed the Add Bi-directional Flow issue that is slated for inclusion in 1.0-M4. Is this intended for solving this issue where two related flows are paired to do bi-directional input/output?
- Am I just trying to implement my mongo driver before the required features are ready?
The documentation says, about this code:A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.I would like a "resilient client" and I think leaving this part as an "exercise for the reader" is asking a bit much from the audience. We need an example of how to do this as it is likely the typical case not the exception (nobody needs another echo server/client). I suspect that the answer to my confusion lies in the information intended but not stated by this sentence from the documentation.
Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to split out the input reading because it is NOT obvious to me where this "input reading" is being done!
If I used mapAsynch to obtain the request data from my driver's clients, it seems very obtuse to be setting up numerous Futures as opposed to just allowing them to give me a Source[Request] from which their requests are read and processed.
Any help you can provide to prevent me from drowning in these waters would be much appreciated!!Thanks,Reid.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
Hi Eric,Thanks for your response. It is a little comforting to know that I’m not the only one on this planet also struggling with how to do something real with akka-stream. I’m sure this thread will eventually lead to the clarity we both seek.
Yes, I’ve seen this before but it led to my grumbling about the lack of an internal design document. That Flow.apply method creates a GraphBackedFlow.
When I first saw that I was encouraged because I realized I would just need to understand GraphBackedFlow to understand what Flow.apply(Sink[I],Source[O]) was giving me. But, the documentation for that object only says “INTERNAL API”. I’m fine with that lack of documentation for things that I don’t use directly, but this GraphBackedFlow is something returned by the Flow companion and would be an instance of something I use in my program. It needs user level documentation. Either that or the Flow.apply method should be much more explicit about what is returned. All it says is "Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.” It would be very much appreciated if there was at least a line or two of what this GraphBackedFlow represented. I get that it backs the flow with some sort of graph of objects but that really doesn’t give me much information about its operation.
This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.I will have to dive into the Working With Graphs page in more detail. I’ve been giving it less attention because of this sentence:Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.
I believe my needs are two linear “roads” not a network of junctions that fan-in and fan-out; but then again, I could be wrong.
Akka Team: Please don’t take any of this commentary as derogatory; I’m quite enthusiastic about using akka-stream and StreamTcp. I understand you are scrambling to get this released in a couple of months and there is much to do and document. In the spirit of cooperation, I will suggest that if you can point Eric and I in the right direction (e.g. how to write an asynchronous Tcp protocol), then I will volunteer to add sample code and documentation to Akka that explains this part (once I understand it, of course!) :)
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
On Feb 13, 2015, at 6:40 AM, Endre Varga <endre...@typesafe.com> wrote:Hi Reid,On Thu, Feb 12, 2015 at 5:56 PM, Reid Spencer <re...@reactific.com> wrote:hAkkers,I've been unable to grok how to communicate with a TCP socket using akka-stream and StreamTcp extension. At this point, I'm not sure the fault is entirely mine. :)I can understand that many of the concepts might be alien at first. In our defense, it is always hard to see what might be unclear for newcomers, since we build the stuff and know it inside-out :) Your feedback is very valuable in this regard.
- The repl value is defined by invoking Flow[ByteString]. Now, I know the API well enough to know that Flow requires two type parameters: Flow[+In,-Out]. This is confusing because the Flow companion object's apply method takes only one type argument which it expands by duplicating. So Flow[ByteString] actually instantiates a Flow[ByteString,ByteString]. I only note this because it took some digging around in the API before I understood how this worked and while it is handy, it is also not straight forward.
This might need to be explained in the docs yes. But the logic is simple: when you create a Flow by Flow[T] then what you get is an empty Flow that just does nothing to the input, it passes down it unmodified (since the transformation pipeline is empty). Obviously this means that you cannot say Flow[Int, String] for an empty flow, since Int will never become a String if there is no processing step there. Why this setup is needed is that every step on Flow gives you back a new flow. So:val doNothing: Flow[Int, Int] = Flow[Int]val gimmeString: Flow[Int, String] = doNothing.map(_.toString)The doNothing flow must have its two type parameters equal, because that Flow just does not transform anything. On the other hand gimmeString has a different type signature since a map stage is appended to the empty Flow.
Flow is simply a data structure for representing a set of transformations (usually in a linear fashion but not necessarily)
- The documentation implies that a Flow is uni-directional. It says a Flow "connects its up- and downstreams by transforming the data elements flowing through it." That, to me, says "unidirectional". The use of a Flow[ByteString,ByteString] for the repl value indicates to me that a uni-directional "transformation" from ByteString to ByteString is occurring and yet this code implies that it is doing both reading and writing to the socket (i.e. it is bi-directional). How can that be?
Well, the doc is a bit imprecise. Basically a Flow is a description of a transformation that have exactly one input and output port. In *most* of the cases this is simply a unidirectional flow of elements coming from the input and passed down through output, but there are exceptions to this rule. The docs simplified this issue and just refers to it as a unidirectional element.Now in the TCP example it *is* however unidirectional, since what it looks like is:+->(inbytes)-->+| |[TCP] [HandlerFlow]| |+<-(outbytes)<-+As you see the "HandlerFlow" is really just taking input bytes and transforms them to output bytes.
- I see repl as a Flow that does this: takes a ByteString as input, chunks it into \n terminated lines up to 256 bytes, prints those lines out prefixed by "Server: " and then discards that input and replaces it with a line read from the console which is then output with a newline appended unless the input was "q" in which case it is replaced by "BYE\n" and a termination signal. Okay, that's all great and it is all unidirectional writing data to the socket. So, now the questions:
- Where is the reading from the server to get the original line(s) as input to the flow? I.e. where is the Source[ByteString]?
The source of confusion is that "handleWith" convenience method that we added exactly to make things simpler. But what happens is actually:flow.join(handler).run()
def handleWith(handler: Flow[ByteString, ByteString])(implicit fm: FlowMaterializer) =
Now you may rightly ask what is that flow there, well, OutgoingConnection are nothing but a wrapper around a Flow[ByteString, ByteString], which you can get via OutgoingConnection::flow. That's right, a TCP connection is not a Sink and a Source, but a Flow! (remember Flow is something that have exactly one input and output port). Why is it a Flow and not a Sink+Source? Simply because it is many times more convenient in client setups:sourceOfRequests -> EncodePackets -> tcpFlowRepresentingRemoteService -> DecodePackets -> sinkOfResponsesI.e. a TCP connection becomes a transformation step.
Now the only remaining mystery is flow.join(handler). This simply wires together two flows by making a cycle: connecting flow1's output to flow2's input, and connecting flow2's output to flow1's input.
- Assuming that connection.handleWith(repl)does some magic to set up the Source, how does the conversation get started? Is it assumed the server will send some data upon connection? The echo server example seems to have the same issue!
Echo server does not need to start the conversation, because it just echoes back whatever it has written. I.e. the echo "service flow" that you attach to the TCP flow of the incoming connection just transforms each incoming bytes to outgoing bytes without modification. If you need the server side to start the conversation you can use the concatenation operators to send an initial sequence of elements before sending anything other.
- FYI: OutgoingConnection.handleWith's documentation says this:
"Handles the connection using the given flow. This method can be called several times, every call will materialize the given flow exactly once thereby triggering a new connection attempt to theremoteAddress. If the connection cannot be established the materialized stream will immediately be terminated with a akka.stream.StreamTcpException."
- It would be nice if there was a little more description around "Handles the connection". Handles it how? Does it set up a Source and Sink? Are asynchronous bi-directional reading from the socket and writing to the socket implied?
The definition is the code snipped I pasted above: it wires the Flow's input part to the incoming bytes port of TCP flow, and wires the Flow's output port to the outgoing bytes port of TCP.
- Am I to infer from all this that the StreamTcp.outgoingConnection creates a Source[ByteString] from the socket's input and a Sink[ByteString] for the socket's output and that the flow provided to handleWith is run between these two? In other words, the OutgoingConnection can really only transform its input to its output? If so, then:
Yes, on a conceptual level this is true.
- How is that generally applicable?
Every Server/Client is just a transformation between inputs and outputs which is expressed as a protocol.
The missing piece you are probably after are graphs. A Flow (which, as I said is something with exactly one inpurt and output port) can host actually a graph with complex routing in it, calls to other parts of the application via mapAsync, or sending off messages to actors.
- This approach works fine for an echo client, but clearly there are protocols where the input and output can and should be processed independently, aren't there?
Yes, even Http is such protocol, even though it looks like a simple request/reply protocol (but it is more horrible than that).
- How would one do what Mongo needs and have an asynch flow of requests that is independent of an asynch flow of responses?
This is too generic to answer. If you can decompose your problem we can help with some of the subproblems.
- I noticed the Add Bi-directional Flow issue that is slated for inclusion in 1.0-M4. Is this intended for solving this issue where two related flows are paired to do bi-directional input/output?
Yes and no. It will basically introduce a new DSL element that has exactly *two* input and *two* output ports. This is rather handy when working with networking. SSL will be such a bidi-stage.
- Am I just trying to implement my mongo driver before the required features are ready?
I recommend to wait for M4, we have some rewrite in progress. It will make though many of the concepts simpler.
The documentation says, about this code:A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, these improvements however are left as exercise for the reader.I would like a "resilient client" and I think leaving this part as an "exercise for the reader" is asking a bit much from the audience. We need an example of how to do this as it is likely the typical case not the exception (nobody needs another echo server/client). I suspect that the answer to my confusion lies in the information intended but not stated by this sentence from the documentation.The input reading here refers to reading from the console, i.e. it expresses that simply using a map there is not optimal, we just didn't want to complicate the example.
For example you can have an actor that actually does the console reading, and then use mapAsync together with the ask pattern to communicate with that actor from within the stream. There will be other options to factor out code and error handling. Please be patient about this :)
Specifically, I do not comprehend how mapAsync (or mapAsyncUnordered) help to split out the input reading because it is NOT obvious to me where this "input reading" is being done!As I mentioned, this refers to the reading from console (i.e. user input). Reading from the socket is behind the curtains, the chain of transformations will grab the next TCP bytes as soon as they are ready for processing again (for example a map finished mapping the previous element and could pass it to the next stage)
If I used mapAsynch to obtain the request data from my driver's clients, it seems very obtuse to be setting up numerous Futures as opposed to just allowing them to give me a Source[Request] from which their requests are read and processed.I am not sure I got this. If you have actual small sized subproblems then we can help.
-EndreAny help you can provide to prevent me from drowning in these waters would be much appreciated!!Thanks,Reid.--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/BQhnmseCyN0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
On Feb 13, 2015, at 6:51 AM, Endre Varga <endre...@typesafe.com> wrote:Hi Reid,On Fri, Feb 13, 2015 at 11:59 AM, Reid Spencer <re...@reactific.com> wrote:Hi Eric,Thanks for your response. It is a little comforting to know that I’m not the only one on this planet also struggling with how to do something real with akka-stream. I’m sure this thread will eventually lead to the clarity we both seek.I probably need to add that one of the beginner issues is to try to do *everything* with streams. Currently this is doomed to fail, and even in the future it will be not possible in all cases. Plain old actors are always there to use, so use streams where they really shine already, right now: for example a chain of decoding/encoding steps, simple transformations that can be expressed as an ask to an actor (you can use it together with mapAsync and have your actor magically backpressured!). Also, graphs are almost always needed for protocols.
Yes, I’ve seen this before but it led to my grumbling about the lack of an internal design document. That Flow.apply method creates a GraphBackedFlow.These will be gone in M4. A Flow is just a collection of processing steps (any graph layout) with exactly one input and output port that is not wired to anything.
When I first saw that I was encouraged because I realized I would just need to understand GraphBackedFlow to understand what Flow.apply(Sink[I],Source[O]) was giving me. But, the documentation for that object only says “INTERNAL API”. I’m fine with that lack of documentation for things that I don’t use directly, but this GraphBackedFlow is something returned by the Flow companion and would be an instance of something I use in my program. It needs user level documentation. Either that or the Flow.apply method should be much more explicit about what is returned. All it says is "Create a [[Flow]] from a seemingly disconnected [[Source]] and [[Sink]] pair.” It would be very much appreciated if there was at least a line or two of what this GraphBackedFlow represented. I get that it backs the flow with some sort of graph of objects but that really doesn’t give me much information about its operation.These things will go away with M4, see explanation below.
This seems to allow you to "handle" a TCP connection with a Flow using independent Sink and Source. I am still in the process of trying to construct a FlowGraph using ActorSubscriber and ActorPublisher to integrate the TCP connection with application logic using actors, but finding it a challenge to do so in a simple way and avoiding circular references. So I'm still working on "grokking" it myself, but perhaps this discovery might help.I will have to dive into the Working With Graphs page in more detail. I’ve been giving it less attention because of this sentence:Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.Well, I think that sentence really does not convey the importance of graphs. It is very likely that you will need graphs for more complex application.
So, just to give you a taste of M4:- Internally, everything is just a "Module" which has input and output ports- An atomic Module is directly translated to an executable entity- A composite module consists of atomic modules and possible other composite modues, with some ouput ports connected to some input ports- you can hierarchically nest these modules- A Flow is *really* nothing more than just a DSL shim over a module that has exactly one input and output port, but has arbitrarily complex nested graph layout inside- Same goes for Source and Sink
Now you can start imagining your application as a hieararchic setup of "boxes" (modules) that represent services, and has input and output ports. You compose simpler services to more complex ones and wrap them in another box, until you have a nice nested hierarchy.And that's it :)
I believe my needs are two linear “roads” not a network of junctions that fan-in and fan-out; but then again, I could be wrong.This is hard to know. I don't know the exact hassles you need.
Akka Team: Please don’t take any of this commentary as derogatory; I’m quite enthusiastic about using akka-stream and StreamTcp. I understand you are scrambling to get this released in a couple of months and there is much to do and document. In the spirit of cooperation, I will suggest that if you can point Eric and I in the right direction (e.g. how to write an asynchronous Tcp protocol), then I will volunteer to add sample code and documentation to Akka that explains this part (once I understand it, of course!) :)Well, just give a very dummy-sized example problem that you find enough to start (please keep it minimal, we are short in resources) and I can attempt to give some pointers.
yout) with exactly one input and output port that is not wired to anything.Okay, just to be clear, GraphBackedFlow will be gone as will the Flow.apply method that creates them ?
Okay, can I just suggest that “Module” might be a poor choice for a name as it is already highly overloaded in lot of other contexts. Node wouldn’t be much better but choosing good names is a significant aide to comprehension of these concepts.