Streams and CLOSE_WAIT connections

386 views
Skip to first unread message

Andrey Kuznetsov

unread,
Jun 7, 2015, 12:03:01 AM6/7/15
to akka...@googlegroups.com
I am using akka-streams for handling incoming TCP connections. After putting application behind AWS Elastic Load Balancer, it started to fall into "too many open files" problem because of lots of connections in CLOSE_WAIT state. Is there an ability to close such connection on a server side?

Andrey Kuznetsov

unread,
Jun 7, 2015, 12:17:44 AM6/7/15
to akka...@googlegroups.com
Just found this issue: https://github.com/akka/akka/issues/17122
So, it's akka-io problem and there is no currently any workaround when using akka-streams, am i right?

Andrey Kuznetsov

unread,
Jun 7, 2015, 2:17:36 AM6/7/15
to akka...@googlegroups.com
My application also has an HTTP (websocket) server on another port, it's behind Elastic Load Balancer too, and it doesn't suffer from such problem - there is no CLOSE_WAIT connections to it.

Andrey Kuznetsov

unread,
Jun 7, 2015, 2:19:36 AM6/7/15
to akka...@googlegroups.com
By HTTP (websocket) server I mean akka-http.

Akka Team

unread,
Jun 7, 2015, 7:34:49 AM6/7/15
to Akka User List
Hi Andrey,

I am not sure your problem is related to that ticket (https://github.com/akka/akka/issues/17122). Do you have some code that represents what you are trying to do? The TCP flows implement proper half-close, so if it not used carefully it might end up being half-closed instead of fully closed.

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Andrey Kuznetsov

unread,
Jun 7, 2015, 10:34:03 AM6/7/15
to akka...@googlegroups.com
I am handling connections with the following code:
val connections = Tcp().bind(interface, port)

connections runForeach { conn ⇒
  log.info(s"Client connected from: ${conn.remoteAddress}")

  try {
    val flow = MTProto.flow(nextConnId(), maxBufferSize, sessionRegion)
    conn.handleWith(flow)
  } catch {
    case e: Exception ⇒
      log.error(e, "Failed to create connection flow")
  }
}

where flow is:

def flow(connId: String, sessionRegion: SessionRegion)(implicit db: Database, system: ActorSystem, timeout: Timeout) = {
val authManager = system.actorOf(AuthorizationManager.props(db), s"authManager-${connId}")
val authSource = Source(ActorPublisher[MTProto](authManager))

val sessionClient = system.actorOf(SessionClient.props(sessionRegion), s"sessionClient-${connId}")
val sessionClientSource = Source(ActorPublisher[MTProto](sessionClient))

val mtprotoFlow = Flow[ByteString]
.transform(() ⇒ new PackageParseStage)
.transform(() ⇒ new PackageCheckStage)
.transform(() ⇒ new PackageHandleStage(protoVersions, apiMajorVersions, authManager, sessionClient))

val mapRespFlow: Flow[MTProto, ByteString, Unit] = Flow[MTProto]
.transform(() ⇒ mapResponse(system))

val completeSink = Sink.onComplete {
case x ⇒
system.log.debug("Completing {}", x)
}

Flow() { implicit builder ⇒
import FlowGraph.Implicits._

val bcast = builder.add(Broadcast[ByteString](2))
val merge = builder.add(Merge[MTProto](3))

val mtproto = builder.add(mtprotoFlow)
val auth = builder.add(authSource)
val session = builder.add(sessionClientSource)
val mapResp = builder.add(mapRespFlow)
val complete = builder.add(completeSink)

// format: OFF

mtproto ~> merge
auth ~> merge
session ~> merge ~> mapResp ~> bcast ~> complete

// format: ON

(mtproto.inlet, bcast.out(1))
}
}

  • PackageParseStage is a StatefulStage which parses ByteStrings into protocol entities
  • PackageCheckStage is a PushStage which makes some checks on protocol entities and makes ctx.fail if somethig is wrong
  • PackageHandleStage is a StatefulStage which forwards requests to some external services and generates some packages to send to client via emit(iterator.ctx)
  • auth and sessions are ActorPublishers which publish responses from external services for sending to a client.

On Sunday, June 7, 2015 at 7:03:01 AM UTC+3, Andrey Kuznetsov wrote:

Andrey Kuznetsov

unread,
Jun 7, 2015, 10:44:50 AM6/7/15
to akka...@googlegroups.com
and mapResponse is a PushStage:

def mapResponse(system: ActorSystem) = new PushStage[MTProto, ByteString] {
  private[this] var packageIndex: Int = -1

  override def onPush(elem: MTProto, ctx: Context[ByteString]) = {
    packageIndex += 1
    val pkg = TransportPackage(packageIndex, elem)

    val resBits = TransportPackageCodec.encode(pkg).require
    val res = ByteString(resBits.toByteBuffer)

    elem match {
      case _: Drop
        ctx.pushAndFinish(res)
      case _
        ctx.push(res)
    }
  }



On Sunday, June 7, 2015 at 7:03:01 AM UTC+3, Andrey Kuznetsov wrote:

Endre Varga

unread,
Jun 7, 2015, 11:07:13 AM6/7/15
to akka...@googlegroups.com
Hi Andrey,

When do the auth and session sources finish? Do you see the onComplete log message from broadcast? Since the connections you see are in CLOSE_WAIT, that means that your server already received a close from the client, but it has not yet closed its own side (half-closed). I suspect that since you have more than one input wired into your merge node, if the inbound mtproto flow closes, the merge itself continues if the other two inputs of it are not completing and therefore keeping the connection open.

-Endre
_

--

Andrey Kuznetsov

unread,
Jun 7, 2015, 11:32:14 AM6/7/15
to akka...@googlegroups.com
auth and session makes context.stop(self) inside Cancel handler of their receive's. And they see it if client closes connection (at least if it is being closed by Connection reset by peer).
I tried to send PoisonPill to auth and session actors from Sink.onComplete and still observed CLOSE_WAIT connections.
--
 
Andrey Kuznetsov
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Endre Varga

unread,
Jun 7, 2015, 11:38:51 AM6/7/15
to akka...@googlegroups.com
On Sun, Jun 7, 2015 at 5:32 PM, Andrey Kuznetsov <fe...@loathing.in> wrote:
auth and session makes context.stop(self) inside Cancel handler of their receive's.

umm, but they will not get a Cancel from merge if merge itself is not cancelled, which it will not be in this case. So mtproto gets the completion from the TCP flow, feeds it into merge, but merge will continue happily.
 
And they see it if client closes connection (at least if it is being closed by Connection reset by peer).

Yes, because then the TCP flow *cancels* not half-closes, i.e. you will get a cancel event coming from the downstream to the upstreams, which in turn causes merge itself and all upstreams to cancel (there is also an onError coming from the upstream direction at the same time). A normal remote client on the other hand comes as a completion event form the 

Anyway, it is easy to check this. Put a printline in somewhere the auth or session actor where it gets a Cancel, then try a half-closed client connection and see what gets printed.

-Endre

Andrey Kuznetsov

unread,
Jun 7, 2015, 2:48:57 PM6/7/15
to akka...@googlegroups.com
On Sunday, June 7, 2015 at 6:38:51 PM UTC+3, drewhk wrote:


On Sun, Jun 7, 2015 at 5:32 PM, Andrey Kuznetsov <fe...@loathing.in> wrote:
auth and session makes context.stop(self) inside Cancel handler of their receive's.

umm, but they will not get a Cancel from merge if merge itself is not cancelled, which it will not be in this case. So mtproto gets the completion from the TCP flow, feeds it into merge, but merge will continue happily.

umm, I see, now I understand

I rewrote the flow, and now I observing only TIME_WAIT and no CLOSE_WAIT sockets which is pretty ok. The fixed flow:

val completeSink = Sink.onComplete {
  case x ⇒
    authManager ! PoisonPill
    sessionClient ! PoisonPill

    system.log.debug("Completing {}", x)
}

Flow() { implicit builder ⇒
  import FlowGraph.Implicits._

  val bcast = builder.add(Broadcast[ByteString](2))
  val merge = builder.add(Merge[MTProto](3))

  val mtproto = builder.add(mtprotoFlow)
  val auth = builder.add(authSource)
  val session = builder.add(sessionClientSource)
  val mapResp = builder.add(mapRespFlow)
  val complete = builder.add(completeSink)

  // format: OFF

  bcast ~> complete
  bcast ~> mtproto ~> merge

           auth    ~> merge
           session ~> merge ~> mapResp

  // format: ON

  (bcast.in, mapResp.outlet)
}
Enter code here...

Thank you, both @Akka team and @drehk!

Reza Samee

unread,
Nov 18, 2016, 2:33:19 AM11/18/16
to Akka User List
Hi guys; I'm sorry for replying to this old thread but I'm curious about this.
I was the same problem (CLOSE_WAIT connections); I searched and found this (old) thread; but the solution is weird, and not possible for my case, then I searched and found another/better (in my opinion) solution: I just set the merge stage to complete if one of the sources completes (don't wait for all of them): 'builder add Merge[Something](2, true)'
It worked for me. Now I have a question: Why didn't you use of that? Was this option available in the version that you were using? or it had another reason?

Андрей Кузнецов

unread,
Nov 18, 2016, 7:31:09 AM11/18/16
to Akka User List
Hi Reza,

At the moment of discussion there was no eagerComplete option. Thanks for mentioning it, i've just got rid of completeSink and everything is working perfectly!
Reply all
Reply to author
Forward
0 new messages