val bytesStage = // elided... BidiFlow of serialization and framing
val serverValuePromise = Promise[Seq[AnyRef]]()
// Technically, the materialized value isn't important, since it's actually going to be pulled out
// via the Promise
val serverConsumerFlow: Flow[AnyRef, AnyRef, Future[Seq[AnyRef]]] = Flow.wrap(
// Consume the client's stream and complete the serverValuePromise with its folded result
Sink.fold(Vector.empty[AnyRef])((acc, v: AnyRef) => acc :+ v).mapMaterializedValue(v => { serverValuePromise.completeWith(v); v }),
// We're not sending anything from this side
Source.empty)(Keep.left)
// The server
val serverSide: Future[ServerBinding] = StreamTcp().bindAndHandle(serverConsumerFlow.join(bytesStage), "0.0.0.0", 0, halfClose = true)
// We really want to stop listening once the client has successfully connected, but this is good
// enough
serverValuePromise.future.onComplete {
case _ =>
serverSide.onSuccess {
case binding => binding.unbind()
}
}
// I need the endpoint where the client needs to connect
val destination = Await.result(serverSide, 1.second).localAddress
// Get the source running
Source((1 to 10).map(new Integer(_))).via(bytesStage.joinMat(StreamTcp().outgoingConnection(destination))(Keep.right)).to(Sink.ignore).run()
// Print out what the client has sent to the server
Await.result(serverValuePromise.future, 1.second).foreach(t => println(s"tt: $t"))
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
And now I added another version where the server just streams random
numbers until the client disconnects, then it closes the connection.
It needed a custom stage though to make emitting from an Iterable
interruptible (mapConcat does not interrupt on completion, only on
errors).
On Wed, Jul 29, 2015 at 1:59 PM, Endre Varga <endre...@typesafe.com
<mailto:endre...@typesafe.com>> wrote:
I now updated the gist with the reverse direction: Now a client
sends a String command and expects an Iterable[Int] back as a
response. I currently limited the funcionality to one request per
connection, since otherwise I would need a bit more elaborate
codec which would complicate the example (I would need to add a
delimiter between the iterables on the wire. Not too hard to add
it though). It still shows how these things are supposed to work.
-Endre
On Wed, Jul 29, 2015 at 1:14 PM, Akka Team
<de...@derekwyatt.org <mailto:de...@derekwyatt.org>> wrote:
Hi,
I'm still trying to figure out the best way to work with
TCP flows and, while I've got something working, this
seems really quite wrong, so there's gotta be a better way.
What I want to do is send an Iterable[Int] from the client
to the server and have the server materialize that
resulting flow in a Future[Iterable[Int]].
||
val bytesStage =// elided... BidiFlow of serialization and
framing
val serverValuePromise =Promise[Seq[AnyRef]]()
// Technically, the materialized value isn't important,
since it's actually going to be pulled out
// via the Promise
val
serverConsumerFlow:Flow[AnyRef,AnyRef,Future[Seq[AnyRef]]]=Flow.wrap(
// Consume the client's stream and complete the
serverValuePromise with its folded result
Sink.fold(Vector.empty[AnyRef])((acc,v:AnyRef)=>acc
:+v).mapMaterializedValue(v
=>{serverValuePromise.completeWith(v);v }),
// We're not sending anything from this side
Source.empty)(Keep.left)
// The server
val
serverSide:Future[ServerBinding]=StreamTcp().bindAndHandle(serverConsumerFlow.join(bytesStage),"0.0.0.0",0,halfClose
=true)
// We really want to stop listening once the client has
successfully connected, but this is good
// enough
serverValuePromise.future.onComplete {
case_ =>
serverSide.onSuccess {
casebinding =>binding.unbind()
<mailto:akka-user+...@googlegroups.com>.
To post to this group, send email to
akka...@googlegroups.com
<mailto:akka...@googlegroups.com>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com <http://letitcrash.com>
Twitter: @akkateam
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the
Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from
it, send an email to akka-user+...@googlegroups.com
<mailto:akka-user+...@googlegroups.com>.
To post to this group, send email to
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to akka-user+...@googlegroups.com
<mailto:akka-user+...@googlegroups.com>.
To post to this group, send email to akka...@googlegroups.com
<mailto:akka...@googlegroups.com>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com <http://letitcrash.com>
Twitter: @akkateam
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the
Google Groups "Akka User List" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
akka-user+...@googlegroups.com
<mailto:akka-user+...@googlegroups.com>.
To post to this group, send email to akka...@googlegroups.com
<mailto:akka...@googlegroups.com>.
These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was
the use of Flow.transform()
. The difference between map
and transform
is now clear to me, but I
then started to question the difference between a Stage
and a Flow
, and they now confuse me a bit…
(if this is covered somewhere, please forgive me). Why is a Stage
not a derived type of Flow
? Or, perhaps another way to say that is,
Why do Stage
s exist at all? It feels like there are Flow
s, BidiFlow
s and Graph
s. Stage
feels like it’s just another
manifestation of a Flow
, perhaps an earlier part of the design
that may not need to be there?
P.S. It looks like the
stream-composition doc is missing something in the attributes
section. The diagram shows that nestedSource
is connected to nestedSink
but the code doesn’t do such a
thing. Did you mean to add a nestedSource.to(nestedSink)
?
July 29, 2015 at 9:28 AM
And now I added another version where the server just streams random numbers until the client disconnects, then it closes the connection. It needed a custom stage though to make emitting from an Iterable interruptible (mapConcat does not interrupt on completion, only on errors).
--
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
July 29, 2015 at 7:59 AM
I now updated the gist with the reverse direction: Now a client sends a String command and expects an Iterable[Int] back as a response. I currently limited the funcionality to one request per connection, since otherwise I would need a bit more elaborate codec which would complicate the example (I would need to add a delimiter between the iterables on the wire. Not too hard to add it though). It still shows how these things are supposed to work.-Endre
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
July 29, 2015 at 7:14 AM
-Endre- the implementation caps the size of the Iterable but currently just silently ignores overflows (I was lazy to build a stage or use fold for this sample, so I used grouped())- draining the client data to an Iterable might be suboptimal if the Iterables are large, in this case a Source[Int] would be a better abstractionSome notes:- includes a simple codec pair for encoding the Ints. It is kind of stupid for this use case, but it works.- exposes the server API as a Source[(InetSocketAddress, Iterable[Int]), Future[ServerBinding]]. It will provide you with a continuous stream of client address, client data iterable pairs.- exposes the client API as a Source[Int, Unit]. Anytime you materialize that source and send it data, it will open a TCP connection and dump the integers to the server, then closes the connectionThe features in that app:I created a sample app that does what you want, you can find the gist here: https://gist.github.com/drewhk/25bf7472db04b5699b80Hi Derek,It is not that hard, but you need to develop a certain kind of intuition to attack these problems. I very much recommend the new documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html as it helps to visualize the ideas.
--
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
July 26, 2015 at 3:12 PM
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/tng5CiUtfig/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?
P.S. It looks like the stream-composition doc is missing something in the
attributes
section. The diagram shows thatnestedSource
is connected tonestedSink
but the code doesn’t do such a thing. Did you mean to add anestedSource.to(nestedSink)
?
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
Hi Derek,On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt <de...@derekwyatt.org> wrote:These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.If you have read the documentation page that I linked before (http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules.
A Stage is just an "atomic Flow", if you prefer to look it that way (strictly speaking it is the transform() op which is the "atomic Flow" and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the
built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html
July 30, 2015 at 12:48 PM
These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?
P.S. It looks like the stream-composition doc is missing something in the
attributes
section. The diagram shows thatnestedSource
is connected tonestedSink
but the code doesn’t do such a thing. Did you mean to add anestedSource.to(nestedSink)
?
I did read it :) And all that makes perfect sense (now).July 30, 2015 at 2:23 PMHi Derek,On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt <de...@derekwyatt.org> wrote:These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.If you have read the documentation page that I linked before (http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules.OK. That's good to know. Digging into the code certainly makes it clearer, and it's nice getting a direct pointer of something specific to look at.A Stage is just an "atomic Flow", if you prefer to look it that way (strictly speaking it is the transform() op which is the "atomic Flow" and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators.So, you're saying, "no", then? :) Fair enough... It just "looks" like a Stage has one input port and one output port, so I started to wonder why there was a distinction. Perhaps it's because it's a solid part of the internals, and as an internal component it makes perfect sense. When it comes out of the API, maybe it's a bit confusing... but... I'm still learning.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the
I've certainly read that too, and in-and-of-itself it makes perfect sense. Really what made me question it was the existence of `Flow.transform()`. Then from there it was a question as to why you would need to transform a Stage with one in and one out, into a Flow of one in and one out.built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html
And then when I find myself binding Stages together in the same-ish manner as what I would bind Flows together, it just seemed redundant.
But, with that said, I'm _totally_ still learning about this API. Rationalizing the different purposes of Stages and Flows, and understanding why they are the way they are, is just one of those things I need to do. At the moment, it appears that the Stage is not a sub-type of Flow, but it is a _building block_ of Flow and Source transformations and on their own they're not really useful.
I'm still left wondering why `Source.transform(flow: Flow[...])` isn't reasonably equivalent, but I will figure that out.
July 31, 2015 at 4:47 AM
On Thu, Jul 30, 2015 at 9:07 PM, Derek Wyatt <de...@derekwyatt.org> wrote:
I did read it :) And all that makes perfect sense (now).July 30, 2015 at 2:23 PMHi Derek,On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt <de...@derekwyatt.org> wrote:These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.If you have read the documentation page that I linked before (http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules.OK. That's good to know. Digging into the code certainly makes it clearer, and it's nice getting a direct pointer of something specific to look at.A Stage is just an "atomic Flow", if you prefer to look it that way (strictly speaking it is the transform() op which is the "atomic Flow" and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators.So, you're saying, "no", then? :) Fair enough... It just "looks" like a Stage has one input port and one output port, so I started to wonder why there was a distinction. Perhaps it's because it's a solid part of the internals, and as an internal component it makes perfect sense. When it comes out of the API, maybe it's a bit confusing... but... I'm still learning.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using theWell, somehow you need to build new stream processing logic when the built-in ones are not sufficient. Hence the Stage API.I've certainly read that too, and in-and-of-itself it makes perfect sense. Really what made me question it was the existence of `Flow.transform()`. Then from there it was a question as to why you would need to transform a Stage with one in and one out, into a Flow of one in and one out.built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.htmlA Stage is an atomic processing entity, a Flow is a chain (network) of such entities. Btw, the name "transform" means "transform this stream of elements into a different stream of elements by using this logic" it is not "transform this Stage into a Flow".
When you call myFlow.map(f) what is actually called in the background is myFlow.transform(() => Map(f)), i.e. map is just a sugar over the most generic transform API. Stages are a way to express backpressured stream processing logic in an efficient manner.And then when I find myself binding Stages together in the same-ish manner as what I would bind Flows together, it just seemed redundant.You don't bind Stages together, where does that come from? There must be some source of confusion here,
the only way to bind Stages together is via the Flow API. I.e. there is no redundancy. To summarize:- a Stage is an encapsulated, possibly stateful piece of logic for transforming a stream of elements to a stream of different elements, maybe many-to-one, one-to-many, one-to-one or a mixture of those.- a .transform(() => Stage) call expresses the intent of creating a stream processing module, using the logic provided by Stage. It is a factory, because the Stage can be stateful, hence each materialization needs a new Stage instance- A Flow is a network of such elements (and others, like fan-in, fan-out, or specialized ones like TCP)I have to admit that here I am confused about your confusion because I don't see where you were mislead, and what does not tick for you here.
But, with that said, I'm _totally_ still learning about this API. Rationalizing the different purposes of Stages and Flows, and understanding why they are the way they are, is just one of those things I need to do. At the moment, it appears that the Stage is not a sub-type of Flow, but it is a _building block_ of Flow and Source transformations and on their own they're not really useful.Exactly, a Stage is just a piece of stream processing *logic*. The actual stream processing *module* is transform() which is a factory for a Stage (i.e. the possibly stateful logic).
I'm still left wondering why `Source.transform(flow: Flow[...])` isn't reasonably equivalent, but I will figure that out.The reason is that Stage might be stateful, therefore it is not reusable across materializations. So the actual step to make a Stage a proper Flow is to introduce a factory, which is what the .transform() method does, and this is the reason why Stage cannot extend Flow. Maybe we can add a Flow.fromStageFactory(() => Stage) to make this clearer but I am not sure if that would help that much.
July 30, 2015 at 3:07 PM
I did read it :) And all that makes perfect sense (now).July 30, 2015 at 2:23 PMHi Derek,On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt <de...@derekwyatt.org> wrote:These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.If you have read the documentation page that I linked before (http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules.OK. That's good to know. Digging into the code certainly makes it clearer, and it's nice getting a direct pointer of something specific to look at.A Stage is just an "atomic Flow", if you prefer to look it that way (strictly speaking it is the transform() op which is the "atomic Flow" and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators.So, you're saying, "no", then? :) Fair enough... It just "looks" like a Stage has one input port and one output port, so I started to wonder why there was a distinction. Perhaps it's because it's a solid part of the internals, and as an internal component it makes perfect sense. When it comes out of the API, maybe it's a bit confusing... but... I'm still learning.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the
I've certainly read that too, and in-and-of-itself it makes perfect sense. Really what made me question it was the existence of `Flow.transform()`. Then from there it was a question as to why you would need to transform a Stage with one in and one out, into a Flow of one in and one out. And then when I find myself binding Stages together in the same-ish manner as what I would bind Flows together, it just seemed redundant.built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html
But, with that said, I'm _totally_ still learning about this API. Rationalizing the different purposes of Stages and Flows, and understanding why they are the way they are, is just one of those things I need to do. At the moment, it appears that the Stage is not a sub-type of Flow, but it is a _building block_ of Flow and Source transformations and on their own they're not really useful.
I'm still left wondering why `Source.transform(flow: Flow[...])` isn't reasonably equivalent, but I will figure that out. You've done a very good job explaining things thus far - it may just be a case of me needing some time to let this thing gel properly, and to get some solid uninterrupted time with it.
July 30, 2015 at 2:23 PM
Hi Derek,
On Thu, Jul 30, 2015 at 6:48 PM, Derek Wyatt <de...@derekwyatt.org> wrote:These examples, and the new documentation page, are extremely helpful. Thank you very much. I’m continuing to read / digest / modify them to get a better understanding, but they’ve already cleared up a lot for me.
One thing that popped out was the use of
Flow.transform()
. The difference betweenmap
andtransform
is now clear to me, but I then started to question the difference between aStage
and aFlow
, and they now confuse me a bit… (if this is covered somewhere, please forgive me). Why is aStage
not a derived type ofFlow
? Or, perhaps another way to say that is, Why doStage
s exist at all? It feels like there areFlow
s,BidiFlow
s andGraph
s.If you have read the documentation page that I linked before (http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-composition.html), then you already know that a Flow is nothing else then a box with exactly one input and output port, but it might contain a whole network inside. Source is similar but with exactly one output port, while Sink has exactly one input port. The modules can be composite, i.e. internally contain arbitrarily complex graphs. There are modules though that are not composite, but atomic, as they directly represent some logic not built from other modules.
A Stage is just an "atomic Flow", if you prefer to look it that way (strictly speaking it is the transform() op which is the "atomic Flow" and it is a factory for a Stage). Whenever you are calling .filter(), .map(), etc. on a Source or Sink, you are attaching a Stage to the output port at the end. See https://github.com/akka/akka/blob/release-2.3-dev/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala for most of the implementation of the built-in operators.
Stage
feels like it’s just another manifestation of aFlow
, perhaps an earlier part of the design that may not need to be there?No! Hell, no! It is actually one of the most solid parts of the internals. You can build a Flow by using the built-in operators, but how do you build the built-in operators themselves? Well, they are Stages. What should you do when there is an operator missing? Build your Stage. You should familiarize yourself with stages and look at the doc page for them: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-customize.html
this is excellent feedback,
I think there's room for improvement for grokkability of Akka Streams, thanks Derek and Endre.
--
Cheers,
√
-Endre
...<div title="MDH:PHNwYW4gc3R5bGU9ImZvbnQtZmFtaWx5OiBtb25vc3BhY2U7Ij5UaGVzZSBleGFtcGxlcywgYW5k IHRoZSBuZXcgZG9jdW1lbnRhdGlvbiBwYWdlLCBhcmUgZXh0cmVtZWx5IGhlbHBmdWwuJm5ic3A7 IFRoYW5rIHlvdSBfdmVyeV8gbXVjaC4mbmJzcDsgSSdtIGNvbnRpbnVpbmcgdG8gcmVhZCAvIGRp Z2VzdCAvIG1vZGlmeSB0aGVtIHRvIGdldCBhIGJldHRlciB1bmRlcnN0YW5kaW5nLCBidXQgdGhl eSd2ZSBhbHJlYWR5IGNsZWFyZWQgdXAgYSBsb3QgZm9yIG1lLjxicj48YnI+T25lIHRoaW5nIHRo YXQgcG9wcGVkIG91dCB3YXMgdGhlIHVzZSBvZiBgRmxvdy50cmFuc2Zvcm0oKWAuIFRoZSBkaWZm ZXJlbmNlIGJldHdlZW4gYG1hcGAgYW5kIGB0cmFuc2Zvcm1gIGlzIG5vdyBjbGVhciB0byBtZSwg YnV0IEkgdGhlbiBzdGFydGVkIHRvIHF1ZXN0aW9uIHRoZSBkaWZmZXJlbmNlIGJldHdlZW4gYSBg U3RhZ2VgIGFuZCBhIGBGbG93YCwgYW5kIHRoZXkgbm93IGNvbmZ1c2UgbWUgYSBiaXQuLi4gKGlm IHRoaXMgaXMgY292ZXJlZCBzb21ld2hlcmUsIHBsZWFzZSBmb3JnaXZlIG1lKS4mbmJzcDsgV2h5 IGlzIGEgYFN0YWdlYCBub3QgYSBkZXJpdmVkIHR5cGUgb2YgYEZsb3dgPyZuYnNwOyBPciwgcGVy aGFwcyBhbm90aGVyIHdheSB0byBzYXkgdGhhdCBpcywgV2h5IGRvIGBTdGFnZWBzIGV4aXN0IGF0 IGFsbD8mbmJzcDsgSXQgZmVlbHMgbGlrZSB0aGVyZSBhcmUgYEZsb3dgcywgYEJpZGlGbG93YHMg YW5kIGBHcmFwaGBzLiZu