For information, I backported the FeedableBodyGenerator into the 1.8 branch of async-http-client:
But there is no release so far.
What you can do is simply copy the class into your project:
and then use it to build a streaming upload like this:
(disclaimer: I am writing the code directly in the email. It won't probably compile nor run)
def uploadStream(url: String, contentType: String, ws: WSClient): Iteratee[Array[Byte], WSResponse] = {
// set the feedable body generator
val asyncHttpClient = ws.underlying[AsyncHttpClient]
val bodyGenerator = new FeedableBodyGenerator
var builder = new RequestBuilder("POST")
.setUrl(url)
.addHeader(HeaderNames.CONTENT_TYPE, contentType)
.setBody(bodyGenerator)
// construct the result
val result = Promise[WSResponse]()
asyncHttpClient.executeRequest(builder.build(), new AsyncCompletionHandler[AHCResponse]() {
override def onCompleted(response: AHCResponse) = {
result.success(NingWSResponse(response))
response
}
override def onThrowable(t: Throwable) = {
result.failure(t)
}
})
// construct an Iteratee that uses the body generator
def forward(generator: FeedableBodyGenerator): Iteratee[Array[Byte], WSResponse] = Cont {
case Input.El(array) => {
val isLast = false
generator.feed(ByteBuffer.wrap(array), isLast)
forward(generator)
}
case Input.Empty => forward(generator)
case Input.EOF => {
val isLast = true
generator.feed(emptyBuffer, isLast)
}
Iteratee.flatten(result.future)
}
// begin streaming
forward(bodyGenerator)
}
BIG WARNING: the streaming implementation in the async http client does not handle back pressure. It means that if the producer of array[byte] is faster than the consumer, the content will land into memory.
If the size of the streaming data is very large, it can lead to OutOfMemoryException