akka streams graph is not fully drained when Future[Done] completes

297 views
Skip to first unread message

sub...@gmail.com

unread,
Dec 16, 2016, 9:44:12 AM12/16/16
to Akka User List
Hi,

I'm seeing an issue where the graph completes while there is still data in one of the flows. The last element emitted by the source enters a custom GraphStageLogic flow, where it is sent to a function that returns a Future. That Future has a callback which invokes getAsyncCallback and then push(out). For the last element, the Future callback fires (pushCallback.invoke(xml)) but the AsyncCallback is never invoked and the graph stops.

For more context, this is what I have going on inside the GraphStageLogic:

val s3ListBucket: Source[ByteString, NotUsed] =
s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)

val bucketListingXml: Future[String] = s3ListBucket
.map(_.utf8String)
.runWith(Sink.seq)(materializer)
.map(_.mkString)(materializer.executionContext)

      bucketListingXml.foreach {
xml =>
          println(s"This gets called. prefix $currentPrefix")
pushCallback.invoke(xml)
}(materializer.executionContext)

And the callback

val pushCallback = getAsyncCallback[String] { xml =>
log.info(s"This is never called for last element in graph!")
push(out, xml)
}

I don't see any errors and this issue consistently occurs on the last element. Thanks

Andrew

Viktor Klang

unread,
Dec 16, 2016, 10:03:45 AM12/16/16
to Akka User List
Plase submit a miminized reproducer so readers have a chance of running the code.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Konrad Malawski

unread,
Dec 16, 2016, 10:05:25 AM12/16/16
to akka...@googlegroups.com, Viktor Klang
My crystal ball tells me you may be observing timing artifacts which originate from the fact that you println one thing, but log the other.
So there will be a timing difference when one or the other actually hits System.out.

Agree with Viktor though, reproducers please for such cases :)


-- 
Konrad `ktoso` Malawski
Akka @ Lightbend
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Viktor Klang

unread,
Dec 16, 2016, 10:09:27 AM12/16/16
to Konrad Malawski, Akka User List
Mine [crystal ball] has been in the repair shop the past 5 years and there's always excuse after excuse why they can't fix it. :(

Justin du coeur

unread,
Dec 16, 2016, 10:12:06 AM12/16/16
to akka...@googlegroups.com, Viktor Klang
On Fri, Dec 16, 2016 at 10:05 AM, Konrad Malawski <konrad....@lightbend.com> wrote:
My crystal ball tells me you may be observing timing artifacts which originate from the fact that you println one thing, but log the other.
So there will be a timing difference when one or the other actually hits System.out.

Ah, good point.  In my experience, this is frequently a source of confusion -- I've wound up trying to be very rigorous about using a single logging mechanism, after too many false leads caused by this problem... 

Andrew

unread,
Dec 16, 2016, 2:14:19 PM12/16/16
to akka...@googlegroups.com
ugh, my initial simplified flow does not reproduce the same behavior, so I'll need to dig in more on that. To clarify, the println statements were more of a debug aid; I'm using a Sink.seq and I don't see those values in the resulting Seq.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/fBkWg4gSwEI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

sub...@gmail.com

unread,
Dec 16, 2016, 5:09:18 PM12/16/16
to Akka User List
Here's the code to reproduce. The issue only seems to occur with my custom Source and using a callback in a Flow and while parallelizing. If this code is run with  .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops the last minute, but when run with .via(asyncFlow) it does not.

So with parallelize it prints all but the last minute:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 2016/12/14/23/57, 2016/12/14/23/58)

and without it prints the expected value, with all minutes flowing through the graph:

seq is Vector(2016/12/14/23/50, 2016/12/14/23/51, 2016/12/14/23/52, 2016/12/14/23/53, 2016/12/14/23/54, 2016/12/14/23/55, 2016/12/14/23/56, 2016/12/14/23/57, 2016/12/14/23/58, 2016/12/14/23/59)





package hack.streams

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.stage._
import org.joda.time.{DateTime, Duration, Interval}

import scala.collection.immutable.Seq
import scala.concurrent.{Await, Future}

object AsyncIssue {
import StreamsMaterializer._

def minuteSource(interval: Interval) = new GraphStage[SourceShape[DateTime]]() {
val out = Outlet[DateTime]("keys")
val shape = SourceShape(out)

def zeroToMinute(date: DateTime) = date.withMillisOfSecond(0).withSecondOfMinute(0)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
var isDone: Boolean = false
var current: DateTime = new DateTime(0)

override def preStart() = {
current = zeroToMinute(interval.getStart)
}

setHandler(out,
new OutHandler {
override def onPull(): Unit = {
if (!isDone) {
push(out, current)
current = current.plusMinutes(1)

if (current.isEqual(zeroToMinute(interval.getEnd))) {
isDone = true
}
} else {
complete(out)
}
}
})
}
}

def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
val in = Inlet[DateTime]("minute")
val out = Outlet[String]("string")

val formatter = org.joda.time.format.DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm")

val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
val pushCallback = getAsyncCallback[String] { seq =>
push(out, seq)
}

setHandler(in, new InHandler {
override def onPush(): Unit = {
val minute = grab(in)

val fMin: Future[DateTime] = Future {minute}

fMin.foreach { min =>
pushCallback.invoke(formatter.print(min))
}
}
})

setHandler(out,
new OutHandler {
override def onPull(): Unit = {
pull(in)
}
}
)
}
}

def parallelizeFlow[In, Out](parallelize: Int, flow: Flow[In,Out,NotUsed]): Flow[In, Out, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val dispatcher = builder.add(Balance[In](parallelize))
val merger = builder.add(Merge[Out](parallelize))

for (i <- 0 to parallelize - 1) {
dispatcher.out(i) ~> flow.async ~> merger.in(i)
}

FlowShape(dispatcher.in, merger.out)
})

def run() = {
import StreamsMaterializer._

val asyncFlow = Flow[DateTime].via(futureCallbackFlow)

val source: Source[DateTime, NotUsed] = Source(11 to 20).via(Flow[Int].map { num =>
val formatter = org.joda.time.format.DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm")
formatter.parseDateTime(s"2016/12/14/14/$num")
})

val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new Interval(Duration.standardMinutes(10), new DateTime().dayOfMonth().roundFloorCopy().minusDays(1))))
.via(parallelizeFlow(parallelize = 1, asyncFlow))
// .via(asyncFlow)
.runWith(Sink.seq)

val seq: Seq[String] = Await.result(mat, scala.concurrent.duration.Duration.Inf)
println(s"seq is $seq")
}
}
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Viktor Klang

unread,
Dec 16, 2016, 5:31:37 PM12/16/16
to Akka User List
Is this really a *minimized* reproducer?

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

sub...@gmail.com

unread,
Dec 16, 2016, 7:08:25 PM12/16/16
to Akka User List
Not exactly sure what you were looking for. I'm relatively new to akka streams. This is the minimal amount of code that reproduces the problem, involving:

a custom source
an flow with AsyncCallback
and parallelizing

only deps are akka and joda time

Kyrylo Stokoz

unread,
Jan 13, 2017, 10:53:58 AM1/13/17
to Akka User List
I would like to second this issue.

I`m experiencing similar behavior in my use case:

I`m trying to download file from s3 and sometimes http entity length does not match contentLength header and subsequent json parsing failing.
The code i have (simplified):

final def responseEntityAsString(entity: ResponseEntity)(implicit executionContext: ExecutionContext, materializer: Materializer): Future[String] =
entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
.map(_.utf8String)

Http().singleRequest(HttpRequest(uri = uri))
.flatMap { response =>
responseEntityAsString(response.entity)
.map { data =>
if (!(data.endsWith("}") || data.endsWith("]")))
println(s"Got data [${data.length}] contentLength:[${response.entity.contentLengthOption}] response: [${response.headers.mkString(",")}, $response]")
}

Got data [2088541] contentLength:[Some(2147123)] response: [x-amz-id-2: ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=,x-amz-request-id: C4629FE2B03865B2,Date: Fri, 13 Jan 2017 15:48:43 GMT,Last-Modified: Tue, 10 Jan 2017 10:53:04 GMT,ETag: "575b4d61d5f2be06d062b80d2f16b9e2",x-amz-version-id: qOmZXXwIOmTA9cdBPN79xRX045QVMQt0,Accept-Ranges: bytes,Server: AmazonS3, HttpResponse(200 OK,List(x-amz-id-2: ImgIcWeKCF1Il45P9ugP6y8GPCwkb8BaoohGcLkXK1hI/7KrobtJmvbqMAxfI1QA6uAHKSW7k3c=, x-amz-request-id: C4629FE2B03865B2, Date: Fri, 13 Jan 2017 15:48:43 GMT, Last-Modified: Tue, 10 Jan 2017 10:53:04 GMT, ETag: "575b4d61d5f2be06d062b80d2f16b9e2", x-amz-version-id: qOmZXXwIOmTA9cdBPN79xRX045QVMQt0, Accept-Ranges: bytes, Server: AmazonS3),HttpEntity.Default(application/json,2147123 bytes total),HttpProtocol(HTTP/1.1))]

Kyrylo Stokoz

unread,
Jan 13, 2017, 10:58:00 AM1/13/17
to Akka User List
Just for info:
I tried with akka 2.4.16 + akka-http 10.0.2 and with akka 2.4.14.

In 2.4.14 i see truncation warnings for One2OneBidi stage logged, in 2.4.16 i don`t have such logs, but issue is still reproducible.

sub...@gmail.com

unread,
Jan 20, 2017, 1:29:52 PM1/20/17
to Akka User List
To clarify, this is not a bug in akka. The issue is there are inflight futures that must be accounted for. To summarize you need to keep track of your inflight operations (futures) and override onUpstreamFinish in the InHandler, for example:

override def onUpstreamFinish(): Unit = {
if (inflight == 0) {
complete(out)
} else {
// there are still items in progress, wait and we'll complete the out port in future callback
}
}

Then in my getAsyncCallback

if (inflight == 0) {
if (isClosed(in)) {
complete(out)
}
}

Andrew
Reply all
Reply to author
Forward
0 new messages