Akka streams - tripped up by buffers

100 views
Skip to first unread message

Julian Howarth

unread,
Nov 16, 2016, 4:18:27 AM11/16/16
to Akka User List
I have what seems a very simple problem but cannot come up with a solution for it.
Firstly, I have a flow that does some accumulation, therefore only emits after it has received a number of elements. As a simple example:

val accumulator =
Flow[Int]
.statefulMapConcat { () ⇒
var total = 0
i ⇒ {
val newTotal = total + i
if (newTotal >= 10) {
total = 0
List(newTotal)
}
else {
total = newTotal
List()
}
}
}

This works as expected:

val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4))
.via(accumulator)
.runWith(Sink.fold(Seq.empty[Int])(_ :+ _))

result.futureValue shouldBe Seq(13, 11, 13, 11)

Now, I want to record the index of the first element which started the accumulation. In my actual application this is a java.time.Instant, but Long suffices for this example. As a first naive attempt, using the Graph DSL:

val flow =
Flow[(Int, Long)]
.via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, Long]())
val acc = builder.add(accumulator)
val zip = builder.add(Zip[Int, Long]())

unzip.out0 ~> acc ~> zip.in0
unzip.out1 ~> zip.in1

FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
}))

val result = Source(List(2, 5, 6, 3, 8, 4, 1, 8, 7, 4)).zipWithIndex
.via(flow)
.runWith(Sink.fold(Seq.empty[(Int, Long)])(_ :+ _))

result.futureValue shouldBe Seq((13, 0), (11, 3), (13, 5), (11, 8))

which times out as no elements flow through. So adding a buffer:

val flow =
Flow[(Int, Long)]
.via(Flow.fromGraph(GraphDSL.create() { implicit builder ⇒
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, Long]())
val acc = builder.add(accumulator)
val zip = builder.add(Zip[Int, Long]())

val buffer = builder.add(Flow[Long].buffer(1, OverflowStrategy.dropTail))

unzip.out0 ~> acc ~> zip.in0
unzip.out1 ~> buffer ~> zip.in1

FlowShape[(Int, Long), (Int, Long)](unzip.in, zip.out)
}))

and I get output, but not what I need:

List((13,0), (11,2), (13,4), (11,7)) was not equal to List((13,0), (11,3), (13,5), (11,8))

I think I know what is happening - the first Long element is buffered in the zip stage and so the element in the buffer is not actually the one I expect. But I don't know how to get around it. Using conflate ends up with exactly the same result

Note that I can't easily rewrite the accumulator stage, if I could I would just process the pairs together in a stateful stage.

Thanks,

Julian

Viktor Klang

unread,
Nov 16, 2016, 5:05:27 AM11/16/16
to Akka User List
val accumulator =
  Flow[Int]
    .statefulMapConcat { () ⇒
      var total = 0
      var curIdx = 0L
      var startIdx = curIdx
      i ⇒ {
        val newTotal = total + i
        if (newTotal >= 10) {
          val result = List((newTotal, startIdx))
          curIdx += 1
          startIdx = curIdx
          total = 0
          result
        } else {
          val result = List.empty[(Int, Long)]
          curIdx += 1
          total = newTotal
          result
        }
      }
    }


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Julian Howarth

unread,
Nov 16, 2016, 6:02:09 AM11/16/16
to Akka User List
Thanks Viktor,

But as I said, I don't have access to change the accumulator flow - it's a black box that I otherwise would have to reimplement myself. That's why I'm trying to work around it with the Graph DSL.

Julian
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Viktor Klang

unread,
Nov 16, 2016, 6:04:55 AM11/16/16
to Akka User List
How would auxiliary stages know which index the accumulation started at?

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Julian Howarth

unread,
Nov 16, 2016, 8:24:16 AM11/16/16
to Akka User List

On Wednesday, November 16, 2016 at 11:04:55 AM UTC, √ wrote:
How would auxiliary stages know which index the accumulation started at?

That's where I was hoping an appropriate buffer stage could help, by just keeping the one that corresponds to the initial packet after the accumulator has emitted. But if not ...

The actual use case is for receiving chunked packets over a TCP stream so that I could keep track of when the first chunk arrived. I was hoping to use the built in Framing.lengthField flow, but will just implement it myself, keeping track of the timestamp. As an aside, is there any general value in extending that framing flow to allow storing additional state? For instance, in addition to tracking the timestamp, I am also extracting the original ip address using proxy protocol.
 

Viktor Klang

unread,
Nov 16, 2016, 8:45:11 AM11/16/16
to Akka User List
On Wed, Nov 16, 2016 at 2:24 PM, Julian Howarth <10.ho...@gmail.com> wrote:

On Wednesday, November 16, 2016 at 11:04:55 AM UTC, √ wrote:
How would auxiliary stages know which index the accumulation started at?

That's where I was hoping an appropriate buffer stage could help, by just keeping the one that corresponds to the initial packet after the accumulator has emitted. But if not ...

Seems like a "guesswork stage" :S
 

The actual use case is for receiving chunked packets over a TCP stream so that I could keep track of when the first chunk arrived. I was hoping to use the built in Framing.lengthField flow, but will just implement it myself, keeping track of the timestamp. As an aside, is there any general value in extending that framing flow to allow storing additional state? For instance, in addition to tracking the timestamp, I am also extracting the original ip address using proxy protocol.

If you come up with a low-to-no-overhead solution for that which is a drop-in replacement for the current one then I'd really like to have a look at it. :)
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Julian Howarth

unread,
Nov 17, 2016, 12:53:37 PM11/17/16
to Akka User List
Viktor,

Here's what I came up with: https://gist.github.com/julianhowarth/7287a6e6eaf665dd79307aaff6164cd8

Performance-wise, with some very casual testing I can't detect much, if any, overhead over the original. Checked with both the vanilla case where each inbound messaga corresponds to one frame, and also where the frames are randomly split across the inbound packets. However, I assume you have a more comprehensive set of tests which could be run.

Julian

Julian Howarth

unread,
Nov 23, 2016, 6:34:24 AM11/23/16
to Akka User List
Is there any value in this approach for anyone else? Quite happy to raise issues/PR if there is.

Thanks,

Julian

Viktor Klang

unread,
Nov 28, 2016, 6:54:25 AM11/28/16
to Akka User List
Sorry for the late response,

sounds like you found a solution to your problem!

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Reply all
Reply to author
Forward
0 new messages