Akka Stream : MergeHub and BroadcastHub

108 views
Skip to first unread message

Christophe De Troyer

unread,
Apr 10, 2020, 1:20:31 PM4/10/20
to Akka User List
Hi all,

I've been looking at MergeHub and BroadcastHub for Akka Stream and I am a bit confused.

In the beginning of the documentation the following is mentioned:

It is important to remember that even after constructing the RunnableGraph by connecting all the source, sink and different operators, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve starting up Actors).
...
After running (materializing) the RunnableGraph[T] we get back the materialized value of type T.
 
This makes perfect sense. But I'm having issues uniting this with the code sample from the MergeHub documentation.

// A simple producer that publishes a new "message" every second
val producer : Source[String, Cancellable]= Source.tick(1.second, 1.second, "New message")

// Attach a BroadcastHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
val runnableGraph
: RunnableGraph[Source[String, NotUsed]] =
  producer
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
val fromProducer
: Source[String, NotUsed] = runnableGraph.run()

// Print out messages from the producer in two independent consumers
fromProducer
.runForeach(msg => println("consumer1: " + msg))
fromProducer
.runForeach(msg => println("consumer2: " + msg))

In the above snippet a Source is created, and that is used as the Source for a runnableGraph (meaning it has a source and a sink). Conceptually I understand that the BroadcastHub is indeed a sink. But what I do not understand is that when you run/materialize that RunnableGraph, you get back a Source.

The way I see it, running a graph should return a future of the types of values flowing through that graph. In this case Strings.

Can somebody shed some light on this, please?

 Thanks,
Christophe

Brian Maso

unread,
Apr 10, 2020, 2:11:40 PM4/10/20
to akka...@googlegroups.com
I suggest you post your question on the gitter channel (gitter.im/akka/akka). There are a lot of knowledgeable people who can answer, and I think it is a much more active space than this (deprecated) list.

But to answer your question: the materialized value of an asynchronous Sink is a Future[T]. The materialized value of a BroadcastHub.sink[T] is a Source[T]. Not every materialized value is a Future[_].

So you have basically three parts:
1) Your original runnable graph into which the BroadcastHub.sink[T] is embedded
2) A single Source[T] materialized when (1) is run -- this is a re-usable "blueprint" which can be used to define multiple new runnable graphs.
3) 0 or more runnable graphs that receive messages through (2)

Each message sent the BroadcastHub.sink[T] during the course of (1)'s run will be queued up and delivered (ie "broadcast") to all of the runnable graphs in (3) when they are run. You can re-use the Source[T] from (2) multiple times, effectively allowing you to dynamically "tap" the flow of messages being sent to the Broadcast.sink[T] from (1).

(To be honest I'm not sure of how messages are handled when there are no active Source[T] instances consuming messages sent to Broadcast.sink[T]... My impression is that they would be thrown away, and the bufferSize parameter to BroadcastHub.sink[T] only comes into play when there are one or more active graphs from (3) consuming messages. Experimentation is probably necessary to confirm that.) 

Best regards,
Brian Maso

--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon.
** This group will soon be put into read-only mode, and replaced by discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>>
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/akka-user/facaf52d-b49d-4980-93e0-30f9ead85fed%40googlegroups.com.

Alexey Shuksto

unread,
Apr 13, 2020, 6:29:49 AM4/13/20
to Akka User List
> To be honest I'm not sure of how messages are handled when there are no active Source[T] instances consuming messages sent to Broadcast.sink[T]...
> My impression is that they would be thrown away, and the bufferSize parameter to BroadcastHub.sink[T] only comes into play when there are one or more active graphs from (3) consuming messages.

BroadcastHub is blocked until _all_ of the connected to it's materialized Source sinks signaled a demand.

Thus, if you want hub to discard all elements until there is some meaningful consumer, you need to attach draining no-op sink to it right after materialization of BroadcastHub.sink[T].

пятница, 10 апреля 2020 г., 21:11:40 UTC+3 пользователь Brian Maso написал:
To unsubscribe from this group and stop receiving emails from it, send an email to akka...@googlegroups.com.

Brian Maso

unread,
Apr 13, 2020, 4:14:40 PM4/13/20
to akka...@googlegroups.com
Ah, good factoid! Thanks!

Brian Maso

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/akka-user/e66ff705-6794-4b99-9ef2-3e8d168ab65e%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages