implicit val system = ActorSystem("test-streams")
implicit val materializer = ActorMaterializer()
implicit val context = system.dispatcher
val source: Source[List[Int], Unit] = Source(List(1, 2, 3, 4, 5).map(n => List.fill(n)(n)))
source
// produces a flow of ints
.mapConcat(identity)
// recreate initial grouping n => List.of(n)
.groupBy(identity)
// fold the grouped values into a list
.map {
case (key, list) => list.fold(List.empty[Int])(_ :+ _)
}
// flatten the multiple sources into the stream
.flatten(FlattenStrategy.concat)
// print them out
.runForeach(println)
// shutdown everything
.onComplete { result =>
// Failure(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2: Publisher (akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@45963726) you are trying to subscribe to has been shut-down because exceeding it's subscription-timeout.)
println(result)
system.shutdown()
system.awaitTermination()
}