Smells like a leak - putting akka http under heavy load throws an exception

222 views
Skip to first unread message

AL

unread,
Sep 6, 2017, 6:57:19 AM9/6/17
to Akka User List
Hi, 
I tried to perform a simple test - 
I created a simple outgoing request in a loop simulating users raining 200k requests :
  def apiConnectionFlow: Flow[HttpRequest, HttpResponse, Any] =
    Http().outgoingConnection(host, port)
  def apiRequest(request: HttpRequest): Future[HttpResponse] = Source.single(request).via(apiConnectionFlow).runWith(Sink.head)
  val numRequests = 200000
  for (i <- 1 to numRequests){
    apiRequest(RequestBuilding.Put("/foo", Foo(i)))
  }

and a server that accepts this requests 
val routes: Route = logRequestResult("foo-service") {
    pathPrefix("foo") {
      pathEndOrSingleSlash {
        put {
          entity(as[Foo]) { foo =>
            complete{
              Future(foo).map(r => s"Update - $r ")
            }
          }
        }
      }
    }
  }
  Http().bindAndHandle(routes, "0.0.0.0", 9000)

how ever looking at the listeners using lsof -i tcp:9000 | wc -l I see that the number is constantly increasing (more than 6000!!!) it seams leaking from some point 
and eventually I am getting the following error
akka.actor.default-dispatcher-31] [akka://in-test/system/IO-TCP/selectors/$a/0] Accept error: could not accept new connection
java.io.IOException: Too many open files in system
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at akka.io.TcpListener.acceptAllPending(TcpListener.scala:107)
at akka.io.TcpListener$$anonfun$bound$1.applyOrElse(TcpListener.scala:82)
at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
at akka.io.TcpListener.aroundReceive(TcpListener.scala:32)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

any idea / suggestion will be greatly appreciated 

Martynas Mickevičius

unread,
Sep 7, 2017, 4:15:42 AM9/7/17
to Akka User List
You are opening a new connection for every request. Is that intentional? You should use Http.singleRequest() to send a request via a shared connection pool.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

AL

unread,
Sep 7, 2017, 6:42:59 AM9/7/17
to Akka User List
this is the sender code 
  def apiRequest(request: HttpRequest): Future[HttpResponse] = Source.single(request).via(apiConnectionFlow).runWith(Sink.head)
I thought that using Source.single is doing that. isn't it ?
But I guess you are right 
I finally changed my code to 
  def apiRequest(requests: Seq[HttpRequest]): Future[HttpResponse] = Source.fromIterator(() =>requests.toIterator)
    .via(apiConnectionFlow).runWith(Sink.head)
which did the work

Thanks
Reply all
Reply to author
Forward
0 new messages