Streams - Materalize and return certain values

20 views
Skip to first unread message

Sean Callahan

unread,
Mar 27, 2017, 4:48:10 PM3/27/17
to Akka User List
Hey all. Im starting to figure out the graph DSL and I got curious about something and I am not sure it is possible. In the graph below, I have a Source.queue setup and it is working out great to add new elements to programmatically later.

Now, what I am curious about is if I can somehow offer new elements to the graph (say from user input, compared to the normal cron add) and get there materialized value without completing the graph. 

Example situation, user input causes 5 elements to get dropped into the graph, and I want to get the 5 corresponding values from the input out the end without affecting the continuously added cron elements.

It seems like http://doc.akka.io/docs/akka/current/scala/stream/stream-graphs.html#Accessing_the_materialized_value_inside_the_Graph might be close to what I am thinking of, but I can't quite figure out if\how it applies. 

private val alwaysOnFlow = {
    val parallelLevel = 5
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val dispatcher = builder.add(Balance[S3Time](parallelLevel))
      val merger = builder.add(Merge[ArchiveResult](parallelLevel))

      for (i <- 0 until parallelLevel) {
        val bcast = builder.add(Broadcast[S3Time](2))
        val zip = builder.add(Zip[LogSet, LogSet])

        dispatcher.out(i) ~> bcast ~> imposterFlow ~> zip.in0
                             bcast ~> archiveFlow ~> zip.in1

        zip.out ~> resultsFlow ~> merger.in(i)
      }
      FlowShape(dispatcher.in, merger.out)
    })
  }

  val alwaysOnGraph = Source.queue[S3Time](50, OverflowStrategy.fail)
    .map { time =>
      info(s"Starting on ${time.toString}")
      time
    }
    .via(alwaysOnFlow)
    .to(Sink.foreach { res =>
      info(s"Finished processing ${res.minute}")
    })


Sean Callahan

unread,
Mar 27, 2017, 7:36:15 PM3/27/17
to Akka User List
After some more digging, I found that this is exactly what the broadcast hub is used for! So no need to respond. If anyone is interested, I can throw my implementation in here. as well. 
Reply all
Reply to author
Forward
0 new messages