Infinite ConnectionAttemptFailedException thrown by spray-client

82 views
Skip to first unread message

Daniel Le Clere

unread,
Sep 3, 2015, 8:56:45 AM9/3/15
to spray.io User List
Hello all,

I've recently implemented a little file downloader mechanism using spray-client and I've run into a bit of bother. The mechanism will work just fine for about 2-4 days and then suddenly start producing thousands of ConnectionAttemptFailedExceptions. This results in all downloads failing and can only be rectified by restarting the server.

This has been going on for about two weeks now. I've done some snooping around and so far I've managed to discern it somehow relates to Akka.io and the TCP buffer not being ready, but I'm really at a loss beyond that.

I've included all the relevant information below.

Code: 
object FileDownloader {

  private case object RetryDownload

  def props(implicit basePath: Path): Props =
    Props(classOf[FileDownloader], basePath)

  val downloaderSystem = ActorSystem("FileDownloader", config = ConfigFactory.load.atPath("downloader-system"))

}

class FileDownloader(implicit val basePath: Path) extends Actor with ActorLogging with Stash {

  import FileDownloader._
  import downloaderSystem.dispatcher
  
  implicit val timeout = 30.seconds

  val parent = context.parent

  val backoff = 1.second

  val maxAttempts = 3

  def io = IO(Http)(downloaderSystem)

  override val receive: Receive = {

    case AcquireResource(source) if source.localPath.toFile.exists =>
      sender() ! ResourceFetched(source)

    // TODO add handling for S3 Resource Downloads
    //case AcquireResource(source: S3ResourceLocation) =>

    case AcquireResource(source) =>
      attemptDownload(source, Seq(sender()), 1)

    case _ => ()


  }

  def awaitingDownload(source: ResourceLocation, requestors: Seq[ActorRef], attempt: Int): Receive = {

    case AcquireResource(`source`) if !(requestors contains sender()) =>
      context become awaitingDownload(source, requestors :+ sender(), attempt)

    case AcquireResource(`source`) => ()

    case _: AcquireResource =>
      stash()

    case HttpResponse(StatusCodes.OK, entity, _, _) =>
      try {
        val dest = source.localPath
        dest.getParent.toFile.mkdirs()
        val tempFile = Files.createTempFile("temp-" + source.filename, ".temp")
        FileUtils.writeByteArrayToFile(tempFile.toFile, entity.data.toByteArray)
        Files.move(tempFile, dest)
        success(source, requestors)
      } catch {

        case e: Throwable =>
          self ! Failure(e)

      }

    case RetryDownload =>
      attemptDownload(source, requestors, attempt + 1)

    case _: Failure | _: HttpResponse if attempt <= maxAttempts =>
      context.system.scheduler.scheduleOnce(backoff, self, RetryDownload)

    case HttpResponse(statusCode, entity, _, _) =>
      fail(source, requestors, new Error(s"Failed to download $source (${source.toURL}); reason: $statusCode, ${entity.asString}"))

    case Failure(throwable) =>
      log.error(throwable, s"Failed to download $source (${source.toURL})")
      fail(source, requestors, throwable)

  }

  def fail(source: ResourceLocation, requestors: Seq[ActorRef], reason: Throwable): Unit = {
    val evt = ResourceFetchFailed(source, reason)
    (requestors :+ parent).foreach(_ ! evt)
    complete()
  }

  def success(source: ResourceLocation, requestors: Seq[ActorRef]): Unit = {
    val evt = ResourceFetched(source)
    (requestors :+ parent).foreach(_ ! evt)
    complete()
  }

  def complete(): Unit = {
    unstashAll()
    context become receive
  }

  def attemptDownload(source: ResourceLocation, requestors: Seq[ActorRef], count: Int): Unit = {
    io ! HttpRequest(HttpMethods.GET, Uri(source.toURL.toString))
    context become awaitingDownload(source, requestors, count)
  }

}


Config:
downloader-system {
  spray.can {
    host-connector {
      max-retries = 3
      max-connections = 4
      pipelining = off
    }
    client {
      idle-timeout = 60 s
      request-timeout = 30 s
    }
  }
}

Log:

spray.can.Http$ConnectionAttemptFailedException: Connection attempt to transform.newsnow.io:80 failed at spray.can.client.HttpHostConnectionSlot$$anonfun$connecting$1.applyOrElse(HttpHostConnectionSlot.scala:87) ~[spray-can_2.11-1.3.3.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.6.jar:na] at spray.can.client.HttpHostConnectionSlot.aroundReceive(HttpHostConnectionSlot.scala:33) ~[spray-can_2.11-1.3.3.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.11-2.3.6.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.11-2.3.6.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [akka-actor_2.11-2.3.6.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:220) [akka-actor_2.11-2.3.6.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.11-2.3.6.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.6.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.6.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.6.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.6.jar:na]




The information contained in this e-mail message and any accompanying files is or may be confidential. If you are not the intended recipient, any use, dissemination, reliance, forwarding, printing or copying of this e-mail or any attached files is unauthorised. This e-mail is subject to copyright. No part of it should be reproduced, adapted or communicated without the written consent of the copyright owner. If you have received this e-mail in error please advise the sender immediately by return e-mail or telephone and delete all copies. Fairfax Media does not guarantee the accuracy or completeness of any information contained in this e-mail or attached files. Internet communications are not secure, therefore Fairfax Media does not accept legal responsibility for the contents of this message or attached files.
Reply all
Reply to author
Forward
0 new messages