akka-stream: Getting 'The connection closed with error Connection reset by peer' when running many simultaniuos connections

655 views
Skip to first unread message

Vanya Stanislavciuc

unread,
May 19, 2015, 6:29:40 AM5/19/15
to akka...@googlegroups.com
Hi All,

I'm having a problem when running many simultaneous clients that connect to same server binding. I covered this problem in a unit test. The source code can be found here PeerCloseSpec.scala This is what it does
  1. Creating a tcp server that accepts all incoming connection and for each of connection it sums the size of each incoming ByteString and finishes a promise onComplete.
  2. Creating 300 clients that create an InputStream of same specific size and stream it via tcp connection to the server.
  3. Asserting that server received all 300 streams of specified size.
Assertion is not passing for some of the clients and they report `The connection closed with error Connection reset by peer` in the middle of streaming. This problem only appears when number of clients is big. If setting number to a lower number, i.e 10, test passes.

Could you please have a look and suggest if it is the right way to use akka-stream library when multiple connection have to be established or it is possible a bug in the library itself?
Another interesting point is how fast the test fails depends on system. Using Fedora 3.19.7-200.fc21.x86_64 brings the error only when higher number of clients are started. When using mac, test may fail already with ~30 clients.

Thanks!

Endre Varga

unread,
May 19, 2015, 6:30:41 AM5/19/15
to akka...@googlegroups.com
Hi Vanya,

As a very first step, which version of Akka Streams are you using?

-Endre

--
You received this message because you are subscribed to the Google Groups "Akka Developer List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-dev+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Endre Varga

unread,
May 19, 2015, 6:31:42 AM5/19/15
to akka...@googlegroups.com
.. and these questions should go to akka-user. Akka-dev is reserved for discussions about developing Akka itself.

-Endre

Endre Varga

unread,
May 19, 2015, 6:36:10 AM5/19/15
to akka...@googlegroups.com
Also, you use the index variable from foreach on a stream, which is will be executed on a different thread. That should be volatile. I don't think it makes a difference here though, but closing over variables that will be modified in concurrent contexts is a problem.

Vanya Stanislavciuc

unread,
May 19, 2015, 7:13:24 AM5/19/15
to akka...@googlegroups.com
Hi Endre,

Sorry for posting in a wrong forum.

The unit test can be run by sbt and all versions are specified there:

Here it is
libraryDependencies += "com.typesafe.akka" %% "akka-http-scala-experimental" % "1.0-RC2"
libraryDependencies
+= "com.typesafe.akka" %% "akka-actor" % "2.3.10"



On Tuesday, May 19, 2015 at 12:30:41 PM UTC+2, drewhk wrote:
Hi Vanya,

As a very first step, which version of Akka Streams are you using?

-Endre
On Tue, May 19, 2015 at 12:17 PM, Vanya Stanislavciuc <stanis...@gmail.com> wrote:
Hi All,
iI'm having a problem when running many simultaneous clients that connect to same server binding. I covered this problem in a unit test. The source code can be found here PeerCloseSpec.scala This is what it does

  1. Creating a tcp server that accepts all incoming connection and for each of connection it sums the size of each incoming ByteString and finishes a promise onComplete.
  2. Creating 300 clients that create an InputStream of same specific size and stream it via tcp connection to the server.
  3. Asserting that server received all 300 streams of specified size.
Assertion is not passing for some of the clients and they report `The connection closed with error Connection reset by peer` in the middle of streaming. This problem only appears when number of clients is big. If setting number to a lower number, i.e 10, test passes.

Could you please have a look and suggest if it is the right way to use akka-stream library when multiple connection have to be established or it is possible a bug in the library itself?
Another interesting point is how fast the test fails depends on system. Using Fedora 3.19.7-200.fc21.x86_64 brings the error only when higher number of clients are started. When using mac, test may fail already with ~30 clients.

Thanks!

Endre Varga

unread,
May 19, 2015, 7:50:21 AM5/19/15
to akka...@googlegroups.com
Hi Vanya,

The Subscriber you created is not correct because it is not thread-safe. You should not create Reactive Streams SPI implementations yourself, that is dangerous. There are helpers to create actors that are safely do the same what you tried: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC2/scala/stream-integrations.html

As for the actual test, the reason why it fails is because you attached an empty source to the incoming connection stream which will therefore close the TCP connection early. Here is a modified example that works:

def startServer(promises: Seq[Promise[Long]]): Tcp.ServerBinding = {
  val port = Random.nextInt(1000) + 35823
  @volatile var index = 0

  val futureBinding = Tcp().bind(InetAddress.getLoopbackAddress.getHostAddress, port).to(Sink.foreach { connection ⇒
    connection.flow.join(sizeCounter(promises(index))).run()
    index += 1
  }).run()

  Await.result(futureBinding, 3.seconds)
}

def sizeCounter(promise: Promise[Long]): Flow[ByteString, ByteString, Unit] =
  Flow[ByteString].transform(() ⇒ new PushStage[ByteString, ByteString] {
    var size = 0L

    override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
      size += elem.size
      ctx.pull()
    }

    override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
      promise.trySuccess(size)
      ctx.finish()
    }

    override def onDownstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
      promise.trySuccess(size)
      ctx.finish()
    }
  })

-Endre

Vanya Stanislavciuc

unread,
May 19, 2015, 8:00:48 AM5/19/15
to akka...@googlegroups.com
Hi Endre,

Thanks for the code snippet and explanation. It works as well when a lazy empty source is attached to a tcp connection. But I'll follow your advise and use PushStage instead of a Subscriber.

Vanya
Reply all
Reply to author
Forward
0 new messages