// This flow works fine
def zipSource(num: Int, diff: Int) = Source() { implicit b => import akka.stream.scaladsl.FlowGraph.Implicits._
val source0 = b.add(Source(1 to num)) val source1 = b.add(Source(1 to (num + diff))) val zip = b.add(Zip[Int, Int])
source0 ~> zip.in0 source1 ~> zip.in1
(zip.out) }
// This flow waits indefinitely when diff > 0 def zipDropSource(num: Int, diff: Int) = Source() { implicit b => import akka.stream.scaladsl.FlowGraph.Implicits._
val source = b.add(Source(1 to (num + diff))) val bcast = b.add(Broadcast[Int](2)) val drop = b.add(Flow[Int].drop(diff)) val zip = b.add(Zip[Int, Int])
source ~> bcast ~> zip.in0 bcast ~> drop ~> zip.in1
(zip.out) }
// PASS "Zip" should "complete with same length streams" in { val future: Future[Int] = zipSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1)) whenReady(future)(_ shouldBe 10) }
// PASS it should "complete with different length streams" in { val future: Future[Int] = zipSource(10, 20).runWith(Sink.fold(0)((s, i) => s + 1)) whenReady(future)(_ shouldBe 10) }
// PASS "Zip with drop" should "complete with same length streams" in { val future: Future[Int] = zipDropSource(10, 0).runWith(Sink.fold(0)((s, i) => s + 1)) whenReady(future)(_ shouldBe 10) }
// FAIL it should "complete with different length streams" in { val future: Future[Int] = zipDropSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1)) whenReady(future)(_ shouldBe 10) }
}
Hi-I've encountered an issue with processing a stream that I fan out via broadcast and fan in via zip.The broadcast splits the stream in two with one branch containing a drop element.According to my read of the docs, I would expect the terminating zip to complete when the shorter of the two streams (the one with the drop) completes.However, the flow hangs waiting indefinitely.Here's the relevant part of a test case I put together to reproduce the problem.Note that the flow without the drop (the first flow) works fine with different length streams.What am I doing wrong?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
//Variance//Calculate the variance from a source of integers with a reusable flow ('vari')object Urlaub03 extends App { implicit val system = ActorSystem("sys") try { val size = math.pow(2, 6).toInt val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = size) implicit val materializer = ActorMaterializer(settings)
val src: Source[Int, Unit] = Source(() => new Iterator[Int] { val max = 500 var cnt = 0 def hasNext = cnt < max; def next = { cnt += 1; (math.random * 1000).toInt } })
// Utility flow class Fill[A]() extends StatefulStage[A, A] { override def initial: StageState[A, A] = new StageState[A, A] { override def onPush(elem: A, ctx: Context[A]): SyncDirective = { val iter = new Iterator[A] { def hasNext = true; def next = elem } emit(iter, ctx) } } }
// Utility flow val _cntFlow: Flow[Int, Int, Future[Int]] = { import FlowGraph.Implicits._ val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, _) => cuml + 1) Flow(cntSink) { implicit builder => fold => (fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet) } }
// Utility flow val _sumFlow: Flow[Int, Int, Future[Int]] = { import FlowGraph.Implicits._ val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, elem) => cuml + elem) Flow(sumSink) { implicit builder => fold => (fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet) } }
// Reusable flow val vari: Flow[Int, Double, Unit] = { import FlowGraph.Implicits._
Flow() { implicit b => val bcast = b.add(Broadcast[Int](3)) val zip1 = b.add(Zip[Int, Int]()) val zip2 = b.add(Zip[Int, Double]()) // Transforms a stream of integers to their sum val sumFlow = b.add(_sumFlow) // Transforms a stream of integers to their amount of elements val cntFlow = b.add(_cntFlow) val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) => sum.toDouble / cnt }) // Takes the first element of a stream and transforms it into an endless stream of that element. val fill = b.add(Flow[Double].transform(() => new Fill[Double]())) val vari = b.add(Flow[(Int, Double)].map { case (value, mean) => value - mean })
bcast ~> zip2.in0 bcast ~> sumFlow ~> zip1.in0 bcast ~> cntFlow ~> zip1.in1 zip1.out ~> mean ~> fill ~> zip2.in1 zip2.out ~> vari
(bcast.in, vari.outlet) } } var cnt = 1 // Usage of the 'vari' flow val re = src.via(vari).runForeach { x => println("03 vari: %-6d %-8.4f" format (cnt, x)) cnt += 1 } Await.result(re, 3.second) println("03 complete") } finally { system.shutdown() }}
// Creates an unbounded source of random ints with a known seed (for repeatability)def randomSource(seed: Int) = Source(() => { val random = new Random(seed) Iterator.continually(random.nextInt) })
// Transform a source of integers into a normalized source of doubles where// each element emitted is in the range of 0 to 1// Note that the incoming source must be both finite and support multiple subscribersdef normalize(in: Source[Int, Unit]): Source[Double, Unit] = {
// Fold over the input source to create a new source that emits a single element // which is the range of integers over the entire stream val fold = in.fold((Int.MaxValue, Int.MinValue)) { (range, n) => range match { case (l, u) => (l.min(n), u.max(n)) } }
// Transform the single element range source into an unbounded source // that continually emits the same element val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat)
// Create a stage that normalizes each value val normalize = Flow[(Int, (Int, Int))].map { case (n, (min, max)) if (min == max) => 1.0 case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble) }
// Create the final source using a flow that combines the prior constructs Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) { implicit b => (in, range, zip, normalize) =>
in ~> zip.in0 range ~> zip.in1 zip.out ~> normalize
normalize.outlet }
}
class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { val seed = 42
"Normalize" should "properly calculate for constant stream" in {
val value = 5 val size = 100 val expected = Seq.fill(size)(1.0)
val constants = Source.repeat(value).take(size) val normalized = normalize(constants)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result => //println(s"result: $result") result should have size expected.size result.zip(expected).foreach { case (actual, expected) => actual shouldBe expected } } }
it should "properly calculate for random stream" in {
val size = 100 val expected = Seq.fill(size)(1.0)
val randoms = randomSource(seed).take(size) val normalized = normalize(randoms)
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))
whenReady(future) { result => //println(s"result: $result") result should have size expected.size result should contain (0.0) result should contain (1.0) result.exists(_ < 0.0) shouldBe false result.exists(_ > 1.0) shouldBe false } }
}