How to download large file with Akka-http V1.0-RC4 ?

1,760 views
Skip to first unread message

alain marcel

unread,
Jul 2, 2015, 4:21:31 AM7/2/15
to akka...@googlegroups.com
On Windows 7, a Play framework V2.4.1 serves a large file (near 650Mb) on localhost:8080.
I can download it from my web browser chrome.
When I try to do the same thing with Akka-http based client, I have an error described at the end of this post.
If I replace the big file by a small one (5Mb), all is ok.

The trouble exist even if I set or not following configuration :
akka.http.client.parsing.max-content-length = 800m

Thanks for any help.

See bellow all details.

Returned error
==============
[ERROR] [07/02/2015 09:27:33.125] [MySys-akka.actor.default-dispatcher-2] [ActorSystem(MySys)] Outgoing request stream error
akka.http.scaladsl.model.IllegalResponseException: Response Content-Length 731408384 exceeds the configured limit of 8388608
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:76)
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:73)
at akka.stream.impl.fusing.Collect.onPush(Ops.scala:82)
at akka.stream.impl.fusing.Collect.onPush(Ops.scala:77)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:65)
at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Play framework
==============
routes :
GET / controllers.Application.url_index

application.scala :
class Application extends Controller {
def url_index = Action { Ok.sendFile(new File("c:/tmp/big_file.avi")) }
}
Downloader.scala : client side application
==========================================
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.impl.util._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

object Downloader extends App {

val config: Config = ConfigFactory.parseString(
"""
 akka.http.client.parsing.max-content-length = 800m
""")

implicit val system = ActorSystem("MySys", config)
implicit val materializer = ActorMaterializer()
import system.dispatcher

val host = "localhost"
val port = 8080

val uri = "http://%s:%s".format(host, port)
println("URI=" + uri)
val result = Http().singleRequest(HttpRequest(uri = uri))

result.map(_.header[headers.Server]) onComplete {
case Success(res) =>
val responseOpt = result.value
val resp = responseOpt.get.get
val sourceDataBytes: Source[ByteString, Any] = resp.entity.dataBytes

sourceDataBytes.runForeach(
x => {
val fos = new FileOutputStream("c:/tmp1/z/tutu", true)
fos.write(x.toArray)
fos.close()
}
)
//system.shutdown()

case Failure(error) =>
println("ERROR FAILURE")
println(error)
system.shutdown()
}
}

alain marcel

unread,
Jul 2, 2015, 4:39:27 AM7/2/15
to akka...@googlegroups.com
If I replace Play server with Akka-http server (see source bellow), then I can download large file (about 650Mb).
So, where is the trouble with the couple Play/Akka-Http ?
Thanks


Server.scala
============
package app

import java.io.FileInputStream
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem

import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.{ByteString, Timeout}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)

override def hasNext = buffer.hasRemaining

override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}

object Server extends App {

// def map(path: Path) : MappedByteBuffer = {
// val channel = FileChannel.open(path, StandardOpenOption.READ)
// val result = channel.map(FileChannel.MapMode.READ_ONLY, 0L, channel.size())
// channel.close()
// result
// }

def map1(path: String) : MappedByteBuffer = {
val inputStream = new FileInputStream(path)
val channel: FileChannel = inputStream.getChannel();
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size);
channel.close()
result
}


implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()
implicit val askTimeout: Timeout = 500.millis

import HttpMethods._

val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
// val path = "c:/tmp1/toto.txt"
// val path = "c:/tmp1/jfxrt.jar"
val path = "c:/tmp1/garcons.avi"
println("=========== path=" + path)
val result = Try {
val mappedByteBuffer = map1(path)
val iterator = new ByteBufferIterator(mappedByteBuffer, 4096)
var alainCnt = 0
val chunks = Source(() => iterator).map { x =>
if ( alainCnt % 10000 == 0 )
println("Chunk of size " + x.size + "  cnt=" + alainCnt)
alainCnt += 1
ChunkStreamPart(x)
}
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks))
} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get

case _: HttpRequest =>
HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}


// OK:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", port = 8080)

val bindingFuture: Future[Http.ServerBinding] = serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()

System.in.read()
system.shutdown()
system.awaitTermination()
}

Martynas Mickevičius

unread,
Jul 3, 2015, 2:59:00 AM7/3/15
to akka...@googlegroups.com
Hi Alain,

to see the difference between your two implementations try downloading a file using curl and see the difference between the generated responses.

In your sample server implementation you are streaming contents when using akka-http. You can do the same in Play Framework as well.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Johannes Rudolph

unread,
Jul 10, 2015, 5:15:45 AM7/10/15
to akka...@googlegroups.com
Hi Alain,


On Thursday, July 2, 2015 at 10:39:27 AM UTC+2, alain marcel wrote:
If I replace Play server with Akka-http server (see source bellow), then I can download large file (about 650Mb).

That's probably because you use chunked transfer encoding in your Akka HTTP server for which the max-content-length is not enforced (yet! see https://github.com/akka/akka/issues/16472). The problem against Play rather seems to be that your setting isn't active for some reason. The "configured limit of 8388608" is the default one, so you need to check that your configuration change is really effective (first, try invalid values and see if running fails, if not make it fail first before changing the value into something reasonable to be sure).

HTH
Johannes

alain marcel

unread,
Jul 20, 2015, 12:18:46 PM7/20/15
to akka...@googlegroups.com
Hi and thanks for your attention.
Now I am using akka-stream 1.0 and scala 2.11.7 with the same results as previously.

New trial : on server side, I replace  "iterator" with "SynchronousFileSource" and I have an exception on client side :
     "IllegalResponseException: Response Content-Length 731408384 exceeds the configured limit of 8388608"
Exception is described at the the end of the mail.

Question : with SynchronousFileSource (and the existing code at server side), may I avoid the exception ?

    val f = new File(path)
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
f.length,
SynchronousFileSource(f, chunkSize = 262144))
HttpResponse(entity = responseEntity)

The full server side code is described at the the end of the mail.

Thanks for any help.


Exception at client side
==================
  akka.http.scaladsl.model.IllegalResponseException: Response Content-Length 731408384 exceeds the configured limit of 8388608
   at akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:76)
   at akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:73)
   at akka.stream.impl.fusing.Collect.onPush(Ops.scala:83)
   at akka.stream.impl.fusing.Collect.onPush(Ops.scala:78)
   at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
   at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
   at akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
   at akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
   at akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
   at akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
   at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
   at akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
   at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
   at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
   at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
...

Server.scala
===========
package app

import java.io.File
import java.io.FileInputStream
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem

import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.io.SynchronousFileSource
import akka.util.{ByteString, Timeout}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)

override def hasNext = buffer.hasRemaining

override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}

object Server extends App {

def map1(path: String) : MappedByteBuffer = {
val inputStream = new FileInputStream(path)
val channel: FileChannel = inputStream.getChannel();
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size);
channel.close()
result
}


implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val askTimeout: Timeout = 500.millis

import HttpMethods._

val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = "c:/tmp/movie.avi"
println("=========== path=" + path)
val result = Try {
//              //---- Solution 1 with iterator
// // Set chunks
// val mappedByteBuffer = map1(path)
// val iterator = new ByteBufferIterator(mappedByteBuffer, 262144) //4096)
// var cnt = 0
// val chunks = Source(() => iterator).map { x =>
// if (cnt % 10000 == 0)
// this.koaLogger.info("Chunk of size " + x.size + " cnt=" + cnt)
// cnt += 1
// HttpEntity.ChunkStreamPart(x)
// }
// // Set response
// val responseEntity = HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)
// HttpResponse(entity = responseEntity)

//---- Solution 2 with SynchronousFileSource
val f = new File(path)
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
f.length,
SynchronousFileSource(f, chunkSize = 262144))
HttpResponse(entity = responseEntity)

} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get

case _: HttpRequest =>
HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}


Reply all
Reply to author
Forward
0 new messages