Akka Streams: How do I know when a flow is finished?

1,381 views
Skip to first unread message

Eric Kolotyluk

unread,
Jun 11, 2015, 6:30:16 PM6/11/15
to akka...@googlegroups.com
I have some simple code

  logger.info( "Hello World!" )
 
 
implicit val system = ActorSystem("System")
 
import system.dispatcher

  logger
.info("Create a Source based on a simple Iterable[T]")
  val source
= Source(1 to 10)
 
  logger
.info("Create sink1 that can be connected to the Source" )
  val sink1
= Sink.foreach { int: Int => logger.info("sink1: " + int) }
   
  logger
.info("Connect the Source to the sink1, obtaining a runnableFlow1")
  val runnableFlow1
: RunnableFlow[Unit] = source.to(sink1)

  logger
.info("Create flowMaterializer1")
  val flowMaterializer1
= ActorFlowMaterializer()
 
  logger
.info("Materialize runnableFlow1 as materializedFlow1")
  val materializedFlow1
= runnableFlow1.run()(flowMaterializer1)

 
// How do I know when materializedFlow1 is finished so I can shut down the actor system?

Cheers, Eric

Viktor Klang

unread,
Jun 11, 2015, 6:40:48 PM6/11/15
to Akka User List
Hi Eric,

You'll need to instruct the connect of the Sink to keep its materialized value rather than the Flows:
val src = Source(immutable.Seq(1,2,3))
val flo = Flow[Int].map(_ * 2)
val sin = Sink.foreach(println)
val runFlow = (src via flo).toMat(sin)(Keep.right)
val fut = runFlow.run()

fut.onComplete(_ => sys.shutdown())

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Viktor Klang

unread,
Jun 11, 2015, 6:44:51 PM6/11/15
to Akka User List
I'd love to see some improvment in the handling of that.
--
Cheers,

Eric Kolotyluk

unread,
Jun 12, 2015, 3:49:32 PM6/12/15
to akka...@googlegroups.com
Thanks; so my new code is

  logger.info("Create a Source based on a simple Iterable[T]")
  val source
= Source(1 to 10)

   
  logger
.info("create redundant flow1 because of Akka Streams API design limitations")
  val flow1
= Flow[Int].map(int => int)

 
  logger
.info("Create sink1 that can be connected to the Source" )

 
//val sink1 = Sink.foreach { int: Int => logger.info("sink1: " + int) }
  val sink1
= Sink.foreach{ int: Int => logger.info("sink1: " + int) }
  
  logger
.info("Connect the Source to the sink1, obtaining a runnableFlow1")
 
//val runnableFlow1: RunnableFlow[Unit] = source.to(sink1)
  val runnableFlow1
= (source via flow1).toMat(sink1)(Keep.right)


  logger
.info("Create flowMaterializer1")
  val flowMaterializer1
= ActorFlowMaterializer()
   
  logger
.info("Materialize runnableFlow1 as materializedFlow1")
  val materializedFlow1
= runnableFlow1.run()(flowMaterializer1)


 
materializedFlow1.onComplete(result => system.shutdown())

I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit]. Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to?

Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does.

Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()?

- Eric

Viktor Klang

unread,
Jun 12, 2015, 4:56:07 PM6/12/15
to Akka User List
On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk <eric.ko...@gmail.com> wrote:
Thanks; so my new code is

  logger.info("Create a Source based on a simple Iterable[T]")
  val source
= Source(1 to 10)

   
  logger
.info("create redundant flow1 because of Akka Streams API design limitations")
  val flow1
= Flow[Int].map(int => int)
 
  logger
.info("Create sink1 that can be connected to the Source" )

 
//val sink1 = Sink.foreach { int: Int => logger.info("sink1: " + int) }
  val sink1
= Sink.foreach{ int: Int => logger.info("sink1: " + int) }
  
  logger
.info("Connect the Source to the sink1, obtaining a runnableFlow1")
 
//val runnableFlow1: RunnableFlow[Unit] = source.to(sink1)
  val runnableFlow1
= (source via flow1).toMat(sink1)(Keep.right)

  logger
.info("Create flowMaterializer1")
  val flowMaterializer1
= ActorFlowMaterializer()
   
  logger
.info("Materialize runnableFlow1 as materializedFlow1")
  val materializedFlow1
= runnableFlow1.run()(flowMaterializer1)


 
materializedFlow1.onComplete(result => system.shutdown())

I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit].

run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the "toMat" (terribly named method, I agree). "Keep" is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) => b)`
 
Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to?

I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`.
 

Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does.


In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear.
 
Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()?

Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as:

def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3]



--
Cheers,

Eric Kolotyluk

unread,
Jun 15, 2015, 1:33:15 PM6/15/15
to akka...@googlegroups.com


On Friday, 12 June 2015 13:56:07 UTC-7, √ wrote:


On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk <eric.ko...@gmail.com> wrote:
Thanks; so my new code is

  logger.info("Create a Source based on a simple Iterable[T]")
  val source
= Source(1 to 10)

   
  logger
.info("create redundant flow1 because of Akka Streams API design limitations")
  val flow1
= Flow[Int].map(int => int)
 
  logger
.info("Create sink1 that can be connected to the Source" )

 
//val sink1 = Sink.foreach { int: Int => logger.info("sink1: " + int) }
  val sink1
= Sink.foreach{ int: Int => logger.info("sink1: " + int) }
  
  logger
.info("Connect the Source to the sink1, obtaining a runnableFlow1")
 
//val runnableFlow1: RunnableFlow[Unit] = source.to(sink1)
  val runnableFlow1
= (source via flow1).toMat(sink1)(Keep.right)

  logger
.info("Create flowMaterializer1")
  val flowMaterializer1
= ActorFlowMaterializer()
   
  logger
.info("Materialize runnableFlow1 as materializedFlow1")
  val materializedFlow1
= runnableFlow1.run()(flowMaterializer1)


 
materializedFlow1.onComplete(result => system.shutdown())

I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit].

run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the "toMat" (terribly named method, I agree). "Keep" is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) => b)`
 
Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to?

I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`.

I am trying to understand the difference between my first case and my second case. I guess I was confused why I needed to create flow1. This is why I feel the API is too complicated, it is hard to reason about what is going on, and why you need to do things such as this to do what you want.
 
 

Sadly Keep is not defined in http://doc.akka.io/api/akka-stream-and-http-experimental/0.10/#akka.stream.scaladsl.package so there is no way to know what it does.


In your IDE, you can go to definition if you are unsure. Otherwise The ScalaDoc for `toMat` should be clear.

I am using Eclipse Mars with Scala IDE 4.1. Why I try to go to the definition, all I get is the class file unassembly. I cannot find any ScalaDoc for Keep.
 
 
Also, is there some overriding reason that the method .toMat() cannot simply be called .toMaterializer()?

Because it doesn't create a materializer. But OTOH I agree that it is terribly named. I'd vote for fixing it. Perhaps by defining `to` as:

def to[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2], combine: (Mat, Mat2) ⇒ Mat3 = Keep.left): Sink[In, Mat3]

.
 

Endre Varga

unread,
Jun 16, 2015, 4:01:00 AM6/16/15
to akka...@googlegroups.com
This is how it looked like originally. Unfortunately, this way you lose type inference of the function argumetns for combine -- hence we changed to curried form. Unfortunately that means that you can no longer override a non-curried version with a curried one, so you need to introduce a new name to the combine variant. You can name that toAndCombineMaterializedValue but honestly, I don't think that helps at all with understanding. It is not the name that causes the confusion but the concept of materialized values, which cannot be dropped if we want to have the lifted representation that we have now.

-Endre

Endre Varga

unread,
Jun 16, 2015, 4:01:59 AM6/16/15
to akka...@googlegroups.com
On Mon, Jun 15, 2015 at 7:33 PM, Eric Kolotyluk <eric.ko...@gmail.com> wrote:


On Friday, 12 June 2015 13:56:07 UTC-7, √ wrote:


On Fri, Jun 12, 2015 at 3:40 PM, Eric Kolotyluk <eric.ko...@gmail.com> wrote:
Thanks; so my new code is

  logger.info("Create a Source based on a simple Iterable[T]")
  val source
= Source(1 to 10)

   
  logger
.info("create redundant flow1 because of Akka Streams API design limitations")
  val flow1
= Flow[Int].map(int => int)
 
  logger
.info("Create sink1 that can be connected to the Source" )

 
//val sink1 = Sink.foreach { int: Int => logger.info("sink1: " + int) }
  val sink1
= Sink.foreach{ int: Int => logger.info("sink1: " + int) }
  
  logger
.info("Connect the Source to the sink1, obtaining a runnableFlow1")
 
//val runnableFlow1: RunnableFlow[Unit] = source.to(sink1)
  val runnableFlow1
= (source via flow1).toMat(sink1)(Keep.right)

  logger
.info("Create flowMaterializer1")
  val flowMaterializer1
= ActorFlowMaterializer()
   
  logger
.info("Materialize runnableFlow1 as materializedFlow1")
  val materializedFlow1
= runnableFlow1.run()(flowMaterializer1)


 
materializedFlow1.onComplete(result => system.shutdown())

I wish this did not have to be so complicated. While I understand this is still experimental, I hope some effort goes into simplifying the API design. In particular, in my original code, I don't understand why materializedFlow1 is Unit and not Future[Unit].

run() returns Unit because that is the MaterializedType of `flow1`, since Akka Streams preserve the leftmost MaterializedType when chaining operations (source via flow1 to sink1) in this case, if you wnat to override that, you'll need to be explicit about it, hence the "toMat" (terribly named method, I agree). "Keep" is a set of functions that is nice to reuse for clarity, you could also write `toMat(sink1)((a,b) => b)`
 
Also, why do explicitly need to create the Flow, why can't the FlowMaterializer do that implicitly if it needs to?

I have no idea what you mean here. If you want to run it immediately you can use `runWith(sink)`.

I am trying to understand the difference between my first case and my second case. I guess I was confused why I needed to create flow1.

You don't need to create a flow. Why do you think you need one?

Viktor Klang

unread,
Jun 16, 2015, 4:14:31 AM6/16/15
to Akka User List

Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.

--
Cheers,

Endre Varga

unread,
Jun 16, 2015, 4:26:30 AM6/16/15
to akka...@googlegroups.com
On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:

Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.

Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.


In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation. If the DSL would not be lifted but eager, these confusions would go away -- at the cost of not being able to introspect stream layouts anymore.

-Endre 

Viktor Klang

unread,
Jun 16, 2015, 5:15:07 AM6/16/15
to Akka User List


On 16 Jun 2015 10:26, "Endre Varga" <endre...@typesafe.com> wrote:
>
>
>
> On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:
>>
>> Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.
>
> Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.

Trust me, the current name is highly confusing too. :)

> The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization
>
> In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation.

I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov.

If the DSL would not be lifted but eager, these confusions would go away -- at the cost of not being able to introspect stream layouts anymore.

Yup, fortunately this is not something that needs any changes :-)

Endre Varga

unread,
Jun 16, 2015, 5:18:38 AM6/16/15
to akka...@googlegroups.com
On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang <viktor...@gmail.com> wrote:


On 16 Jun 2015 10:26, "Endre Varga" <endre...@typesafe.com> wrote:
>
>
>
> On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:
>>
>> Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.
>
> Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.

Trust me, the current name is highly confusing too. :)

I am yet to see a naming proposal that is not confusing or horrible in different ways :)
 

> The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization
>
> In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation.

I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov.

What is your proposal then?

Viktor Klang

unread,
Jun 16, 2015, 5:51:05 AM6/16/15
to Akka User List

On 16 Jun 2015 11:18, "Endre Varga" <endre...@typesafe.com> wrote:
>
>
>
> On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang <viktor...@gmail.com> wrote:
>>
>>
>> On 16 Jun 2015 10:26, "Endre Varga" <endre...@typesafe.com> wrote:
>> >
>> >
>> >
>> > On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:
>> >>
>> >> Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.
>> >
>> > Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.
>>
>> Trust me, the current name is highly confusing too. :)
>
> I am yet to see a naming proposal that is not confusing or horrible in different ways :)

One alternative is to have 'to' always require a function to combine. Makes it apparent that the user needs to care about it. Or make that function an implicit?

>  
>>
>> > The usage of it is explained with various examples here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-flows-and-basics.html#Stream_Materialization
>> >
>> > In general, I still maintain that the source of confusion is not the name itself, but the feature, and the whole concept of the lifted representation.
>>
>> I realize I am not the typical user, but I find the name confusing, especially given that the 'to' method is only toMat but with a baked in Keep.left. Smells hacky from a naming pov.
>
> What is your proposal then?

See above

Endre Varga

unread,
Jun 16, 2015, 5:53:43 AM6/16/15
to akka...@googlegroups.com
On Tue, Jun 16, 2015 at 11:50 AM, Viktor Klang <viktor...@gmail.com> wrote:

On 16 Jun 2015 11:18, "Endre Varga" <endre...@typesafe.com> wrote:
>
>
>
> On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang <viktor...@gmail.com> wrote:
>>
>>
>> On 16 Jun 2015 10:26, "Endre Varga" <endre...@typesafe.com> wrote:
>> >
>> >
>> >
>> > On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:
>> >>
>> >> Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.
>> >
>> > Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.
>>
>> Trust me, the current name is highly confusing too. :)
>
> I am yet to see a naming proposal that is not confusing or horrible in different ways :)

One alternative is to have 'to' always require a function to combine.

That is very cumbersome. Try to rewrite the code examples in the documentation and see how it looks like.

Makes it apparent that the user needs to care about it. Or make that function an implicit?

Implicits won't work well, because there are different types involved. Again, I propose to try it out first on the documentation examples, or our test suite.

-Endre

Viktor Klang

unread,
Jun 16, 2015, 8:02:45 AM6/16/15
to Akka User List


On 16 Jun 2015 11:53, "Endre Varga" <endre...@typesafe.com> wrote:
>
>
>
> On Tue, Jun 16, 2015 at 11:50 AM, Viktor Klang <viktor...@gmail.com> wrote:
>>
>> On 16 Jun 2015 11:18, "Endre Varga" <endre...@typesafe.com> wrote:
>> >
>> >
>> >
>> > On Tue, Jun 16, 2015 at 11:15 AM, Viktor Klang <viktor...@gmail.com> wrote:
>> >>
>> >>
>> >> On 16 Jun 2015 10:26, "Endre Varga" <endre...@typesafe.com> wrote:
>> >> >
>> >> >
>> >> >
>> >> > On Tue, Jun 16, 2015 at 10:14 AM, Viktor Klang <viktor...@gmail.com> wrote:
>> >> >>
>> >> >> Agreed, but 'toMat' is a terrible name. 'combineTo' would be better to be honest.
>> >> >
>> >> > Well, but then that can be confused with combining elements. Currently, while "Mat" does not sound nice, at least it makes it clear that there is something you need to understand, while "combine" will make it easier to misunderstand the purpose.
>> >>
>> >> Trust me, the current name is highly confusing too. :)
>> >
>> > I am yet to see a naming proposal that is not confusing or horrible in different ways :)
>>
>> One alternative is to have 'to' always require a function to combine.
>
> That is very cumbersome. Try to rewrite the code examples in the documentation and see how it looks like.
>
>> Makes it apparent that the user needs to care about it. Or make that function an implicit?
>
> Implicits won't work well, because there are different types involved. Again, I propose to try it out first on the documentation examples, or our test suite.

I am currently testing different approaches, will report back!

Eric Kolotyluk

unread,
Jul 15, 2015, 2:54:05 PM7/15/15
to akka...@googlegroups.com
Interesting discussion. While I have been using Scala for almost a decade now, I still find many of the APIs confusing and counter intuitive, and generally not very readable. In particular, naming of things is very important. My sense is often when people are playing with a new technology, such as reactive streams they pick quick names that are convenient at the time, and more focused on getting stuff published than taking the time to test how well others can comprehend and reason about it.

It was good to see some discussing about how to improve the reactive streams design.

Cheers, Eric
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v72ZkxRZQe4/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Viktor Klang

unread,
Jul 15, 2015, 7:10:20 PM7/15/15
to Akka User List

thanks for that, Eric!

--
Cheers,

Reply all
Reply to author
Forward
0 new messages