Akka Streams - Issue with processing different length streams through drop/zip

304 views
Skip to first unread message

Lance Arlaus

unread,
May 9, 2015, 4:57:59 AM5/9/15
to akka...@googlegroups.com
Hi-

I've encountered an issue with processing a stream that I fan out via broadcast and fan in via zip.
The broadcast splits the stream in two with one branch containing a drop element.
According to my read of the docs, I would expect the terminating zip to complete when the shorter of the two streams (the one with the drop) completes.
However, the flow hangs waiting indefinitely.

Here's the relevant part of a test case I put together to reproduce the problem.
Note that the flow without the drop (the first flow) works fine with different length streams.
What am I doing wrong?

Akka Stream Version: 1.0-RC2

Thanks,
Lance

  // This flow works fine
  def zipSource(num: Int, diff: Int) = Source() { implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source0 = b.add(Source(1 to num))
    val source1 = b.add(Source(1 to (num + diff)))
    val zip = b.add(Zip[Int, Int])

    source0 ~> zip.in0
    source1 ~> zip.in1

    (zip.out)
  }

  // This flow waits indefinitely when diff > 0
  def zipDropSource(num: Int, diff: Int) = Source() {  implicit b =>
    import akka.stream.scaladsl.FlowGraph.Implicits._

    val source = b.add(Source(1 to (num + diff)))
    val bcast = b.add(Broadcast[Int](2))
    val drop = b.add(Flow[Int].drop(diff))
    val zip = b.add(Zip[Int, Int])

    source ~> bcast ~>         zip.in0
              bcast ~> drop ~> zip.in1

    (zip.out)
  }

  // PASS
  "Zip" should "complete with same length streams" in {
    val future: Future[Int] = zipSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  it should "complete with different length streams" in {
    val future: Future[Int] = zipSource(10, 20).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // PASS
  "Zip with drop" should "complete with same length streams" in {
    val future: Future[Int] = zipDropSource(10, 0).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

  // FAIL
  it should "complete with different length streams" in {
    val future: Future[Int] = zipDropSource(10, 10).runWith(Sink.fold(0)((s, i) => s + 1))
    whenReady(future)(_ shouldBe 10)
  }

}

Endre Varga

unread,
May 9, 2015, 5:05:44 AM5/9/15
to akka...@googlegroups.com
Hi Lance,

On Sat, May 9, 2015 at 12:49 AM, Lance Arlaus <lance....@gmail.com> wrote:
Hi-

I've encountered an issue with processing a stream that I fan out via broadcast and fan in via zip.
The broadcast splits the stream in two with one branch containing a drop element.
According to my read of the docs, I would expect the terminating zip to complete when the shorter of the two streams (the one with the drop) completes.
However, the flow hangs waiting indefinitely.

Here's the relevant part of a test case I put together to reproduce the problem.
Note that the flow without the drop (the first flow) works fine with different length streams.
What am I doing wrong?

I don't think you are doing anything wrong. Btw, I suspect the bug being in Broadcast instead. Can you file a ticket please?

-Endre
 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Lance Arlaus

unread,
May 9, 2015, 11:49:48 AM5/9/15
to akka...@googlegroups.com
No problem.
Thanks for the quick response and here's the corresponding issue: https://github.com/akka/akka/issues/17435

Lance Arlaus

unread,
Jun 1, 2015, 6:10:34 PM6/1/15
to akka...@googlegroups.com
Circling back on this, I created a blog post that explains the issue I encountered along with the solution of using a balancing buffer.


I hope it helps those who encounter the same issue.

Akka Team

unread,
Jun 2, 2015, 4:10:58 AM6/2/15
to Akka User List
Wow!

What did you use for the animation? I want that :D

-Endre
--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Akka Team

unread,
Jun 2, 2015, 4:21:00 AM6/2/15
to Akka User List
Btw, small nitpick: The animation shows what I call the "synchronous" execution model, where the demand starts from the end of the stream and travels back and forth (branching out if necessary). In reality, these stages individually demand the first element when they are started, and they immediately request the next element once they started working on a received element.

-Endre

wwagner4

unread,
Jul 27, 2015, 10:50:45 AM7/27/15
to Akka User List, lance....@gmail.com
I do have the same poblem but increasing the buffer size does not really help. In my case all elements from one branche must be read before I read all the elements of the second branch. So the maximum difference is always the complete length of the stream which I do not know in advance (and which can be very big).
In any case it seems not to be a good idea to rely on a buffer size that depends on the amount of elements provided by a stream. As the documentation says: "Adjusting buffer size is only for inreasing performance".

Perhaps some kind of BroadcastPreferred like MergePeferred could be a solution.

Lance Arlaus

unread,
Jul 27, 2015, 11:38:02 AM7/27/15
to Akka User List, akka.o...@gmail.com
Endre-

Apologies, but I just saw your response as a result of wwagner4's inquiry.

Thanks for reading the post and I'm glad you like the animation - it was done in Animatron. That's manual animation, brother :) No luck finding an easier solution.
I'm putting together a talk on Akka Streams / HTTP and wanted a good way to visualize streams.
Good point - I'll be sure to note the immediate demand signal in my talk, though I think the simplified synchronous model lends conceptual clarity to those approaching streams for the first time.
Thanks for the feedback!

-Lance

Lance Arlaus

unread,
Jul 27, 2015, 12:06:50 PM7/27/15
to Akka User List, wwag...@gmail.com
Perhaps I've misunderstood, but it sounds like you need to process the stream twice i.e. sequential, not parallel, processing.
That really isn't a use case for broadcast, AFIK.
A few questions:
  1. Can the underlying stream be opened multiple times? For example, are you reading elements from a file?
  2. Does the first stage in your processing produce an aggregate i.e. a result whose value is dependent upon the entire stream but whose size is independent of the size of the stream? For example, a stage that sums elements or computes a hash.
If the answer to the above are yes, you should be able to easily construct a flow which opens the file, produces the aggregate, and feeds it downstream, zipping it with elements from the second reading of the file (a second subscriber on the same source), all within a single flow.

It would help to understand your requirements to suggest a more specific, correct solution.

-Lance
Message has been deleted
Message has been deleted

wwagner4

unread,
Jul 29, 2015, 12:42:39 PM7/29/15
to Akka User List, lance....@gmail.com
As you expected I want to aggregate the elements of a stream. Perhaps creation a second subscriber on the source (as you suggest) could be a solution. I do not know what you mean by 'creation a second subscriber'

//Variance
//Calculate the variance from a source of integers with a reusable flow ('vari')
object Urlaub03 extends App {
  implicit val system = ActorSystem("sys")
  try {
    val size = math.pow(2, 6).toInt
    val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = size)
    implicit val materializer = ActorMaterializer(settings)

    val src: Source[Int, Unit] = Source(() => new Iterator[Int] {
      val max = 500
      var cnt = 0
      def hasNext = cnt < max;
      def next = { cnt += 1; (math.random * 1000).toInt }
    })

    // Utility flow
    class Fill[A]() extends StatefulStage[A, A] {
      override def initial: StageState[A, A] =
        new StageState[A, A] {
          override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
            val iter = new Iterator[A] {
              def hasNext = true;
              def next = elem
            }
            emit(iter, ctx)
          }
        }
    }

    // Utility flow
    val _cntFlow: Flow[Int, Int, Future[Int]] = {
      import FlowGraph.Implicits._
      val cntSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, _) => cuml + 1)
      Flow(cntSink) {
        implicit builder =>
          fold =>
            (fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet)
      }
    }

    // Utility flow
    val _sumFlow: Flow[Int, Int, Future[Int]] = {
      import FlowGraph.Implicits._
      val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)((cuml, elem) => cuml + elem)
      Flow(sumSink) {
        implicit builder =>
          fold =>
            (fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet)
      }
    }

    // Reusable flow
    val vari: Flow[Int, Double, Unit] = {
      import FlowGraph.Implicits._

      Flow() { implicit b =>
        val bcast = b.add(Broadcast[Int](3))
        val zip1 = b.add(Zip[Int, Int]())
        val zip2 = b.add(Zip[Int, Double]())
        // Transforms a stream of integers to their sum
        val sumFlow = b.add(_sumFlow)
        // Transforms a stream of integers to their amount of elements
        val cntFlow = b.add(_cntFlow)
        val mean = b.add(Flow[(Int, Int)].map { case (sum, cnt) => sum.toDouble / cnt })
        // Takes the first element of a stream and transforms it into an endless stream of that element.
        val fill = b.add(Flow[Double].transform(() => new Fill[Double]()))
        val vari = b.add(Flow[(Int, Double)].map { case (value, mean) => value - mean })

        bcast ~> zip2.in0
        bcast ~> sumFlow ~> zip1.in0
        bcast ~> cntFlow ~> zip1.in1
        zip1.out ~> mean ~> fill ~> zip2.in1
        zip2.out ~> vari

        (bcast.in, vari.outlet)
      }
    }
    var cnt = 1
    // Usage of the 'vari' flow
    val re = src.via(vari).runForeach { x =>
      println("03 vari: %-6d %-8.4f" format (cnt, x))
      cnt += 1
    }
    Await.result(re, 3.second)
    println("03 complete")
  } finally {
    system.shutdown()
  }
}


Lance Arlaus

unread,
Jul 30, 2015, 11:12:58 PM7/30/15
to Akka User List, lance....@gmail.com, wwag...@gmail.com
Following is a running sample of what I was talking about.
Happy to help, however I'd ask that you please start a new thread with related questions since this conversation is not related to the original thread (you've hijacked the thread).

The sample code performs normalization of a stream of integers.
It's a simple example of a flow the requires sequential processing, doing a first pass over the stream of integers to determine range before emitting the normalized version on the second pass.
It's the simplest version of a pattern that you can apply to your own problem at hand.
The key is in the fold, repeat, and flatten to create an unbounded source that can be zipped with the original numbers.

The full self-contained Gist can be found here: https://gist.github.com/lancearlaus/b43b7acb8a3aada51701

Also, if you're doing a variance calculation, there's a well known better way to do that incrementally in a single pass.
It just so happens I tackled this very problem. Here's a pointer to the relevant code in one of my repos.


// Creates an unbounded source of random ints with a known seed (for repeatability)
def randomSource(seed: Int) = Source(() => {
  val random = new Random(seed)
  Iterator.continually(random.nextInt)  
})


// Transform a source of integers into a normalized source of doubles where
// each element emitted is in the range of 0 to 1
// Note that the incoming source must be both finite and support multiple subscribers
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = {

  // Fold over the input source to create a new source that emits a single element
  // which is the range of integers over the entire stream
  val fold = in.fold((Int.MaxValue, Int.MinValue)) { 
    (range, n) => range match {
      case (l, u) => (l.min(n), u.max(n))
    }
  }

  // Transform the single element range source into an unbounded source
  // that continually emits the same element 
  val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat)

  // Create a stage that normalizes each value
  val normalize = Flow[(Int, (Int, Int))].map {
    case (n, (min, max)) if (min == max) => 1.0
    case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble)
  }

  // Create the final source using a flow that combines the prior constructs
  Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) {
    implicit b => (in, range, zip, normalize) => 

    in    ~> zip.in0
    range ~> zip.in1
             zip.out ~> normalize

    normalize.outlet
  }

}

class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures {
 
  val seed = 42

  "Normalize" should "properly calculate for constant stream" in {

    val value = 5
    val size = 100
    val expected = Seq.fill(size)(1.0)

    val constants = Source.repeat(value).take(size)
    val normalized = normalize(constants)

    val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))

    whenReady(future) { result =>
      //println(s"result: $result")
      result should have size expected.size
      result.zip(expected).foreach { case (actual, expected) =>
        actual shouldBe expected
      }
    }
  }

  it should "properly calculate for random stream" in {

    val size = 100
    val expected = Seq.fill(size)(1.0)

    val randoms = randomSource(seed).take(size)
    val normalized = normalize(randoms)

    val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _))

    whenReady(future) { result =>
      //println(s"result: $result")
      result should have size expected.size
      result should contain (0.0)
      result should contain (1.0)
      result.exists(_ < 0.0) shouldBe false
      result.exists(_ > 1.0) shouldBe false
    }
  }

}

Reply all
Reply to author
Forward
0 new messages