Hi all,
I'm using akka-http for uploading big zip files (~ 1Gb) containing a bunch of files to parse. My goal is not to have to temporarily save to disk the file but reading it on the fly. In order to do this, we've written a `ZipInputStreamSource` that builds a `Source` from an `InputStream`. A zip `Source` would have the following type:
val zipSource: Source[(ZipEntryData, ByteString), Future[Long]]
where the `ZipEntryData` contains some info about the zip entry, e.g. the name, and the `ByteString` is the data chunk.
My code looks more or less like this:
```
def route =
pathPrefix("q") {
post {
fileUpload("part-name") {
case (fileInfo, source) =>
val sink = StreamConverters.asInputStream()
val is = source.runWith(sink)
val zipSource =
ZipInputStreamSource(() => is)
.map { case (zed, bs) => (
zed.name, bs) }
.groupBy(10000, _._1)
.reduce((t1, t2) => (t2._1, t1._2 ++ t2._2))
.async
.mergeSubstreams
....
complete(...)
}
}
}
```
Basically, we need to do some kind of `reduceByKey` behaviour in order to parse every zip entry as a whole (entries are XML files so we need to parse them completely and cannot be re-chunked).
So far so good. The problem is that we're not able to apply any back-pressure on the uploading process. Basically, when we upload the file with curl, the uploading process ends quickly but the service is blown up returning an OutOfMemory error as soon as the file is big enough.
How could I get the back-pressure behaviour as expected?.
Thanks in advance for your help.
Regards,
Juanjo.