how to define materialized value

1,483 views
Skip to first unread message

Arun Sethia

unread,
Mar 5, 2016, 7:02:56 PM3/5/16
to Akka User List
Hi,

can some explain what does it mean of materialized value ? I have see documentation at http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-quickstart.html#transforming-and-consuming-simple-streams

I am not sure how Flow can define materialize type, for example the following code has Input - Tweet, output - Int but Mat is Unit. I would like to see how someone can define Mat as Int or any example where Flow or source is defining Mat other than Unit.

  • val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)



It is quite confusing for me to understand difference between "out"  and "Mat".


Thanks

As


Rafał Krzewski

unread,
Mar 5, 2016, 7:46:12 PM3/5/16
to Akka User List
Hi,
there are a few ways of doing that. Probably the simplest one is using Flow.mapMaterializedValue. Suppose you'd like to create a Flow that counts the elements that pass through it and makes the current count available through a "side channel":

  trait Counter {
    def get: Long
  }

  def counter[T]: Flow[T, T, Counter] = {
    val internalCounter = new AtomicLong(0)
    Flow[T].map{ elem ⇒
      internalCounter.incrementAndGet()
      elem
     }.mapMaterializedValue(_ ⇒ new Counter{
       override def get = internalCounter.get
     })
  } 

Another way is using a GraphStageWithMaterializedValue while building a custom Flow / Sink / Source. Instead of returning a GraphStageLogic, like an ordinary GraphStage, you return a pair of GraphStageLogic and the materialized value.

Cheers,
Rafał

Arun Sethia

unread,
Mar 6, 2016, 2:43:10 AM3/6/16
to Akka User List
Thanks Rafal.

Based on this I tried to make sample code, where I would like to count number of elements being processed and their sum:

val source = Source (1 to 5).filter(x=> x%2==0)

val sink:Sink[Int, Future[Int]]=Sink.fold[Int,Int](0)(_ + _)

val runnableGraph = source.via(counter[Int]).toMat(sink)(Keep.both)

val result=runnableGraph.run()



def counter[T]: Flow[T, T, Counter] = {
val internalCounter = new AtomicLong(0)
Flow[T].map{ elem ⇒
internalCounter.incrementAndGet()
elem
}.mapMaterializedValue(_ ⇒ new Counter{
override def get = internalCounter.get
})
}



1. using Keep.both, result should able to return me count and sum, but it is not?

2. How materialize values are different than "out"? I am not able to visualize the difference between materialize values and out?

Thanks
Arun
Message has been deleted
Message has been deleted
Message has been deleted

Rafał Krzewski

unread,
Mar 6, 2016, 8:20:43 AM3/6/16
to Akka User List
Arun,

a little correction:

val runnableGraph = source.viaMat(counter[Int])(Keep.right).toMat(sink)(Keep.both)

And subsequently:

val (counter, futureSum) = runnableGraph.run()

Graph outlets are always streams. You need to connect them to a Sink (through intervening Flows or more complex Graphs, as necessary) in order to create a RunnableGraphMaterialized values are the other things used to connect the RunnableGraph to the outside world that are not streams.

For example Sink.fold creates a stream element that is (obviously) a Sink. It does not have any stream outlets. However it provides a materialized value Future[U] that is completed when the Sink's inlet stream is exhausted. This is how a running stream can communicate it's successful completion or failure to the outside world.

Another example is Source.actorPublisher: you provide it with Props for an Actor that implements ActorPublisher contract. When materializing the stream, the Source will instantiate the Actor and return it's ActorRef as a materialized value. The Actor is internal to the stream but you can use the ActorRef as an interface from the outside world into the stream: send messages (using your own protocol) to be passed to the Source's outlet, according to demand from downstream. The tricky part is that such gateway Actor must manage buffering and/or backpressure on it's own!

Besides that, you can use materialized values to monitor stream execution from the outside, like in the Counter example above or https://github.com/akka/akka/pull/19836 or to interrupt a stream that would otherwise run for a long (or unlimited) time: https://github.com/rkrzewski/akka-cluster-etcd/blob/master/etcd-client/src/main/scala/pl/caltha/akka/streams/FlowBreaker.scala

Cheers,
Rafał

Arun Sethia

unread,
Mar 7, 2016, 3:09:52 AM3/7/16
to Akka User List
Awesome thanks a lot.

jurgi...@dataqube.de

unread,
Oct 31, 2018, 6:04:41 PM10/31/18
to Akka User List
Hi Rafal,

I just stumbled upon your reply. Could you explain why you have to wrap the AtomicLong into the Counter trait? I tried returning the naked AtomicLong in the 'mapMaterializedValue' function, but the resulting value was always 0. Why does wrapping in a trait produce a different result?

Best,
Jurgis

Martynas Mickevičius

unread,
Nov 5, 2018, 1:19:17 AM11/5/18
to akka...@googlegroups.com
Did you send any elements to the stream? internalCounter is incremented in the map operator for every incoming stream element.

Trait is only used to hide the implementation details of the counter.

--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>>
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Martynas Mickevičius

unread,
Nov 5, 2018, 1:24:43 AM11/5/18
to akka...@googlegroups.com
Here is a quick scalafiddle that runs the code from 2 years ago :)

jurgi...@dataqube.de

unread,
Nov 5, 2018, 11:35:42 AM11/5/18
to Akka User List
I did send events to the stream, but the counter stayed at 0. Maybe the error was in a different place in my setup. Thanks for the working code example!
Reply all
Reply to author
Forward
0 new messages