private val alwaysOnFlow = {
val parallelLevel = 5
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dispatcher = builder.add(Balance[S3Time](parallelLevel))
val merger = builder.add(Merge[ArchiveResult](parallelLevel))
for (i <- 0 until parallelLevel) {
val bcast = builder.add(Broadcast[S3Time](2))
val zip = builder.add(Zip[LogSet, LogSet])
dispatcher.out(i) ~> bcast ~> imposterFlow ~> zip.in0
bcast ~> archiveFlow ~> zip.in1
}
})
}
val alwaysOnGraph = Source.queue[S3Time](50, OverflowStrategy.fail)
.map { time =>
info(s"Starting on ${time.toString}")
time
}
.via(alwaysOnFlow)
.to(Sink.foreach { res =>
info(s"Finished processing ${res.minute}")
})