[akka-http] what is actually happening on request timeout?

132 views
Skip to first unread message

Kyrylo Stokoz

unread,
Aug 27, 2016, 4:25:31 PM8/27/16
to Akka User List
HI All,

I came across a strange issue happening with akka http on request timeout which i cannot understand, can some body help me with it?

Consider following code in akka 2.4.9:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.server.Directives._
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Zip}

import scala.concurrent.Future

object Test extends App {
implicit val actorSystem = ActorSystem()
implicit val ec = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()

def routes =
(path("test1") & get) {
complete("result1")
} ~
(path("test2") & get) {
complete {
Future {
Thread.sleep(30000)
"result2"
}
}
}

def processRequest(route: Flow[HttpRequest, HttpResponse, NotUsed]): Flow[HttpRequest, HttpResponse, NotUsed] =
new ExtendedFlow(route).extend()

val serverSource = Http().bindAndHandle(processRequest(routes), "0.0.0.0", port = 11011)
}

final class ExtendedFlow[A, B](originalFlow: Flow[A, B, NotUsed]) {

def extend(): Flow[A, B, NotUsed] =
Flow.fromGraph {
GraphDSL.create() { implicit builder =>

val in = builder.add(Flow[A].map { e => println("in " + e); e })

val broadcast = builder.add(Broadcast[A](2))
val zip = builder.add(Zip[A, B]())

val out = builder.add(Flow[(A, B)].map { o => println("out " + o); o._2 })

in ~> broadcast; broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> originalFlow ~> zip.in1; zip.out ~> out

FlowShape(in.in, out.out)
}
}
}

Now if i execute `curl -v "http://localhost:11011/test1"` i correctly see 'in' and 'out' print statements in console and "result1" sent to user.

My actual confusion is when i execute `curl -v "http://localhost:11011/test2"`.
In this case after 20s (default request timeout in akka http) HttpServerBluePrint sends 503 back to user with a message that server was not able to produce response in time.
Later, in 30s, future completes as well, result of it i guess is ignored as response from client was already handled. 
Question here is what actually happening to the extended flow in this situation? 

I don`t see any output from sent from zip.out, though i see 'in' statement printed for test2. Seeing this i would assume either:
1. Flow would stuck eventually or
2. Flow would produce wrong pairs in zip commit from next request elements.

From my observations flow keeps working without any problems/exceptions and produce correct pair in zip? 

Anyone can shed some light what is actually going on here?

Regards,
Kyrylo



Akka Team

unread,
Sep 8, 2016, 8:29:18 AM9/8/16
to Akka User List
Hi Kyrlyo,

The flow will be cancelled as the downstream is already completed. 

I find this little stage may be useful to help you reason about what is going on, you should be able to verify that it prints "onDownstreamFinish" when the route times out:
case class SnitchingStage[A](name: String) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("in")
val out = Outlet[A]("out")
override val shape: FlowShape[A, A] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
println(s"$name: onPush")
push(out, grab(in))
}
override def onUpstreamFinish(): Unit = {
println(s"$name: onUpstreamFinish")
super.onUpstreamFinish()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
println(s"$name: onUpstreamFailure: ${ex.getMessage}")
super.onUpstreamFailure(ex)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
println(s"$name: onPull")
pull(in)
}
override def onDownstreamFinish(): Unit = {
println(s"$name: onDownstreamFinish")
super.onDownstreamFinish()
}
})
}
}

--
Johan
Akka Team

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Lightbend - Reactive apps on the JVM
Twitter: @akkateam
Reply all
Reply to author
Forward
0 new messages