Akka Streams fanout Question

382 views
Skip to first unread message

Christian Schmitt

unread,
Aug 22, 2015, 6:42:59 AM8/22/15
to Akka User List
Hello, currently I have an actor which gets a Publisher,
this publisher has either one or multiple elements.

Based on that I want to run through this data and send the values to multiple Subscribers (fan-out)
One should process it and get additional data from a database (which returns a future)
Another should do the same but without the additional data 

both will index the data to elasticsearch Currently I tried to have a Sink.fanoutPublisher, however it will still only run once either with getting data from the database or without demo code:

val pub = Source(a.data).runWith(Sink.fanoutPublisher(4, 4))
Source(pub).runForeach{ _ => log.debug("1")}
Source(pub).runForeach{ _ => log.debug("2")}


Currently a.data is a publisher which holds a case class, however my output will always be:

[debug] w.IndexWorker - 1

Is there a way to attach multiple sources? or better just splitting the first source and send it to multiple?

Konrad Malawski

unread,
Aug 22, 2015, 6:56:27 AM8/22/15
to akka...@googlegroups.com, Christian Schmitt
What you're looking for is a Broadcast operation.

-- 
Cheers,
Konrad Malawski
Akka @ Typesafe
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Message has been deleted

Christian Schmitt

unread,
Aug 22, 2015, 7:03:07 AM8/22/15
to Akka User List, c.sc...@briefdomain.de
Thanks somehow I always visited the wrong documentation...

Konrad Malawski

unread,
Aug 22, 2015, 8:35:51 AM8/22/15
to akka...@googlegroups.com, Christian Schmitt, c.sc...@briefdomain.de
What do you mean by "the wrong documentation"?
We'd like to improve the docs (maybe add links etc), so it would help if you could explain what was confusing for you here.

You don't need a Merge, just don't put one in your graph.
and just remote the Merge from there :-)


-- 
Cheers,
Konrad Malawski
Akka @ Typesafe

Christian Schmitt

unread,
Aug 22, 2015, 9:53:07 AM8/22/15
to Konrad Malawski, akka...@googlegroups.com
Somehow Google points to the M2 release which only has FlowGraph {implicit b =>} which of course doesn’t work. 

The 1.0 documentation is really good (if you find it..)

Especially:


Also there is an example without a merge:


The sharedDoubler Example however whats missing on some examples is that you need to call run() or you make a via(FlowGraph) call, thats what I did:
val s1 = Sink.publisher[Article]
val s2 = Sink.publisher[Article]
val g = FlowGraph.closed(s1, s2)((_, _)) { implicit builder =>
(s1, s2) =>
import FlowGraph.Implicits._
val bcast = builder.add(Broadcast[Article](2))

Source(a.data) ~> bcast.in
bcast.out(0) ~> s1.inlet
bcast.out(1) ~> s2.inlet

}
val (pub1, pub2) = g.run()

After that I attach the Publisher, to two sources, one pulls data and puts it into elastic search the other just puts it into it, what i still missing is to generalize that.
I mean generalize the Article which is easy, just make a generic, however the number of the outgoing publishers isn’t easily done. (or do I miss something)?
I mean something like:

def flow[T](pub: Publisher[T], out: Int) = {
FlowGraph.closed() { implicit builder => 
            import FlowGraph.Implicits._
           val bcast = builder.add(Broadcast[T](out))
           Source(pub) ~> bcast
        }
}

(pseudocode)

Konrad Malawski

unread,
Aug 22, 2015, 12:42:55 PM8/22/15
to Christian Schmitt, akka...@googlegroups.com

Somehow Google points to the M2 release which only has FlowGraph {implicit b =>} which of course doesn’t work. 

The 1.0 documentation is really good (if you find it..)

Ah I see, yes that's something we should work on with our SEO optimisation I think...

I also thought of putting up a warning and links (fades out automatically) if browsing outdated docs: https://github.com/akka/akka/pull/18295 I hope that'll be useful :-)


I'll give the rest of your code a skim in a bit :-)


-- Konrad

Christian Schmitt

unread,
Aug 24, 2015, 5:53:54 AM8/24/15
to Akka User List, c.sc...@briefdomain.de
Hey yeah the PR will greatly help.

Also any chance about the second question? How to generalize my Broadcaster? So that I have one Publisher and get a multiple publishers out? (specifiable by the number)
Reply all
Reply to author
Forward
0 new messages