I have what seems a very simple problem but cannot come up with a solution for it.
Firstly, I have a flow that does some accumulation, therefore only emits after it has received a number of elements. As a simple example:
val accumulator =
Flow[Int]
.statefulMapConcat { () ⇒
var total = 0
i ⇒ {
val newTotal = total + i
if (newTotal >= 10) {
total = 0
List(newTotal)
}
else {
total = newTotal
List()
}
}
}
This works as expected:
val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4))
.via(accumulator)
.runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
result.futureValue shouldBe Seq(13, 11, 13, 11)
Now, I want to record the index of the first element which started the accumulation. In my actual application this is a java.time.Instant, but Long suffices for this example. As a first naive attempt, using the Graph DSL:
val flow =
Flow[(Int, Long)]
.via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, Long]())
val acc = builder.add(accumulator)
val zip = builder.add(Zip[Int, Long]())
unzip.out0 ~> acc ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
}))
val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4)).zipWithIndex
.via(flow)
.runWith(Sink.fold(Seq.empty[(Int, Long)])(_ :+ _))
result.futureValue shouldBe Seq((13, 0), (11, 3), (13, 5), (11, 8))
which times out as no elements flow through. So adding a buffer:
val flow =
Flow[(Int, Long)]
.via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, Long]())
val acc = builder.add(accumulator)
val zip = builder.add(Zip[Int, Long]())
val buffer = builder.add(Flow[Long].buffer(1, OverflowStrategy.dropTail))
unzip.out0 ~> acc ~> zip.in0
unzip.out1 ~> buffer ~> zip.in1
FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
}))
and I get output, but not what I need:
List((13,0), (11,2), (13,4), (11,7)) was not equal to List((13,0), (11,3), (13,5), (11,8))I think I know what is happening - the first Long element is buffered in the zip stage and so the element in the buffer is not actually the one I expect. But I don't know how to get around it. Using conflate ends up with exactly the same result
Note that I can't easily rewrite the accumulator stage, if I could I would just process the pairs together in a stateful stage.
Thanks,
Julian