Proper way to handle the Entity Stream Truncation Exception

955 views
Skip to first unread message

Ubaldo Taladríz

unread,
Jun 29, 2016, 7:33:47 AM6/29/16
to akka...@googlegroups.com
Hi, I'm not sure how to handle the entity stream truncation exception
I have a "Downloader" actor using akka http client side, and I tried everything, shutdown the pool, terminate the actor (The parent watches the child and creates a new one ).

This is the internal error
[ERROR] 2016-06-29 02:09:51.348 [local-akka.actor.default-dispatcher-3] FileSubscriber - Tearing down FileSink(tareas/2/16016/descarga/Resolucion_1467180465357.pdf) due to upstream error
akka.http.scaladsl.model.EntityStreamException: Entity stream truncation

then i catch this error in entity.dataBytes.runWith(FileIO.toFile(new File(dir, file.fileName.get))).onComplete({
r.wasSuccesful is false (See the code below)

[ERROR] 2016-06-29 02:09:51.350 [local-akka.actor.default-dispatcher-3] Downloader - !!!!!!Entity stream truncation 

Then i got an error for each child (The default is 4 Downloaders)
[ERROR] 2016-06-29 02:09:51.351 [local-akka.actor.default-dispatcher-3] ActorSystemImpl - Outgoing request stream error
akka.http.scaladsl.model.EntityStreamException: Entity stream truncation

Finally I got this error and the pool is shut down:
[ERROR] 2016-06-29 02:09:51.353 [local-akka.actor.default-dispatcher-5] PoolMasterActor - connection pool for PoolGateway(hcps = HostConnectionPoolSetup(qadnp.portalafp.cl,443,ConnectionPoolSetup(ConnectionPoolSettings(4,5,32,1,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/2.4.7),10 seconds,1 minute,512,<function0>,List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Full,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0, User-Agent -> 32),false,<function1>,<function1>,<function2>))),akka.http.scaladsl.HttpsConnectionContext@5a726436,akka.event.BusLogging@68cb54fd))) has shut down unexpectedly


This is the code

 case JobStart(t: Task) =>
      task = t
      if (!t.task.parameter.get.getOrElse("local", "false").toBoolean) {
        val uri = t.file.parameters.get("downloadUri")
        bd.run(updateFile(t.file, "downloading")).onComplete({
          case Success(r) =>
            http.get.singleRequest(HttpRequest(uri = uri)).pipeTo(self)
          case Failure(e) =>
            throw e
        })(fixedThreadPoolExecutionContext)
      }
      else {
        endJob(t)
      }

case HttpResponse(OK, headers, entity, _) =>
      val content = headers.filter(_.name() == "Content-Disposition")
      if (content.size > 0) {
        val dir = new File(task.get.dir, "download")
        dir.mkdir()
        val file = task.get.file.copy(fileName = Some(headers.filter(_.name() == "Content-Disposition").head.value().split(";")(1).split("=")(1)))
        task = Some(task.get.copy(file = file))
        entity.dataBytes.runWith(FileIO.toFile(new File(dir, file.fileName.get))).onComplete({
          case Success(r) =>
            log.debug(s"Downloaded ${task.get.file.id.get}  actor ${self.path.name}")
            if (r.wasSuccessful)
              bd.run(updateFile(task.get.file, "downloadd")).onComplete({
                case Success(r) =>
                  endJob(task.get)
                case Failure(e) => throw e
              })(fixedThreadPoolExecutionContext)
            else {
              log.error(s"!!!!!!${r.getError.getMessage}",r.getError)
              bd.run(updateFile(task.get.file, "downloading", Some(s"Error  ${r.getError.getMessage}"), Some(Errors.downloadingError))).onComplete({
                case Success(t) =>
                  log.info("!!!!Finalizo Trabajo")
                  endJob(task.get.copy(retries = task.get.retries + 1, state = "start"))
                  context.stop(self) //I tried everything, http.shutdownAllConnectionPools()
                case Failure(e) => throw e
              })(fixedThreadPoolExecutionContext)

            }
          case Failure(e) =>
            log.error(e.getMessage,e)
            bd.run(updateFile(trask.get.file, "downloading", Some(s"Error  ${e.getMessage}"), Some(Errors.downloadingError))).onComplete({
              case Success(t) =>
                endJob(task.get.copy(retris = task.get.retries + 1, state = "start"))
              case Failure(e) => throw e
            })(fixedThreadPoolExecutionContext)
        })
      }
      else {
      // The server responds HTTP 200 OK but the content has no file attached
        entity.toStrict(2.second).map(s=>s.data.utf8String).onComplete({
          case Success(_) =>
          case Failure(_) =>
        }
        )(fixedThreadPoolExecutionContext)
        bd.run(updateFile(task.get.file, "downloading", Some(s"Error no  Content-Disposition header"), Some(Errors.downloadingError))).onComplete({
          case Success(t) =>
            endJob(task.get.copy(retries = task.get.retries + 1, state = "start"))
          case Failure(e) => throw e
        })(fixedThreadPoolExecutionContext)
      }


    case HttpResponse(code, headers, entity, _) =>
      //Server respond != HTTP 200 OK
      entity.toStrict(2.second).map(s=>s.data.utf8String).onComplete({
        case Success(_) =>
        case Failure(_) =>
      }
      )(fixedThreadPoolExecutionContext)
      bd.run(updateFile(task.get.file, "downloading", Some(s"Error http ${code.intValue()} ${code.defaultMessage()}"), Some(Errores.downloadingError))).onComplete({
        case Success(t) =>
          endJob(task.get.copy(retries = task.get.retries + 1, state = "start"))
        case Failure(e) => throw e
      })(fixedThreadPoolExecutionContext)

    case akka.actor.Status.Failure(e) =>
     //URI is wrong
      log.error("Failure Akka")
      log.error(e, e.getMessage)
      bd.run(updateFile(task.get.file, "downloading", Some(e.getMessage), Some(Errors.downloadingError))).onComplete({
        case Success(t) =>
          endJob(task.get.copy(retries = task.get.retries + 1, state = "start"))
        case Failure(e) => throw e
      })(fixedThreadPoolExecutionContext)


--

Ubaldo Taladriz Truan

vima...@gmail.com

unread,
Feb 2, 2018, 7:12:29 AM2/2/18
to Akka User List
Anyone got any updates on this issue. I am having similar problem where akka http client respond with a Success(OK) and then stream gets truncated while entity.dataBytes.runFold(ByteString(""))(_ ++ _). The size of the data received is ~20Mb.

Thanks Vimal.
Reply all
Reply to author
Forward
0 new messages