Streaming a WS response to another WS request body

339 views
Skip to first unread message

Jan Algermissen

unread,
Oct 27, 2014, 11:24:10 AM10/27/14
to play-fr...@googlegroups.com
Hi,

I need to chain a WS respoonse with another WS request (GETing images from a Web service and PUTing them to a Swift Object Store).

Can someone point me to some examples of how I would do that?

(Swift supports chunked transfer, so in case the upstream does not send me a content length in the GET response, I’d additionally like to (have to) send the stream to Swift in chunks.)

Either way, any starting point would help me a great deal.

Jan

Martin Grotzke

unread,
Oct 27, 2014, 6:51:08 PM10/27/14
to play-fr...@googlegroups.com

Hi Jan,

according to the scalaws docs "WS does not support streaming body upload. In this case, you should use the FeedableBodyGenerator provided by AsyncHttpClient."

This code sample from the play scala book shows how to stream data to S3 from an iteratee. Not sure if it works with the current iteratee lib, but it might be a starting point.
https://github.com/playforscala/sample-applications/blob/master/ch10-webservices-iteratees-and-websockets/bodyparsers/app/amazon/AmazonBodyParsers.scala

Cheers,
Martin

--
You received this message because you are subscribed to the Google Groups "play-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Will Sargent

unread,
Oct 27, 2014, 7:49:02 PM10/27/14
to play-fr...@googlegroups.com
If you are working with the FeedableBodyGenerator, you may want to look at Yann Simon's work on it:


Will Sargent
Consultant, Professional Services
Typesafe, the company behind Play Framework, Akka and Scala

Yann Simon

unread,
Oct 29, 2014, 4:11:53 AM10/29/14
to play-framework
For information, I backported the FeedableBodyGenerator into the 1.8 branch of async-http-client:

But there is no release so far.

What you can do is simply copy the class into your project:


and then use it to build a streaming upload like this:
(disclaimer: I am writing the code directly in the email. It won't probably compile nor run)

def uploadStream(url: String, contentType: String, ws: WSClient): Iteratee[Array[Byte], WSResponse] = {

  // set the feedable body generator
  val asyncHttpClient = ws.underlying[AsyncHttpClient]
  val bodyGenerator = new FeedableBodyGenerator
  var builder = new RequestBuilder("POST")
      .setUrl(url)
      .addHeader(HeaderNames.CONTENT_TYPE, contentType)
      .setBody(bodyGenerator)

  // construct the result
  val result = Promise[WSResponse]()
  asyncHttpClient.executeRequest(builder.build(), new AsyncCompletionHandler[AHCResponse]() {
    override def onCompleted(response: AHCResponse) = {
      result.success(NingWSResponse(response))
      response
    }

    override def onThrowable(t: Throwable) = {
      result.failure(t)
    }
  })


  // construct an Iteratee that uses the body generator
  def forward(generator: FeedableBodyGenerator): Iteratee[Array[Byte], WSResponse] = Cont {

    case Input.El(array) => {
      val isLast = false
      generator.feed(ByteBuffer.wrap(array), isLast)
      forward(generator)
    }

    case Input.Empty => forward(generator)

    case Input.EOF => {
      val isLast = true
      generator.feed(emptyBuffer, isLast)
    }
    Iteratee.flatten(result.future)
  }

  // begin streaming
  forward(bodyGenerator)
}

BIG WARNING: the streaming implementation in the async http client does not handle back pressure. It means that if the producer of array[byte] is faster than the consumer, the content will land into memory.
If the size of the streaming data is very large, it can lead to OutOfMemoryException

Jan Algermissen

unread,
Oct 29, 2014, 2:19:52 PM10/29/14
to play-fr...@googlegroups.com
Yann,

On 29 Oct 2014, at 09:11, Yann Simon <yann.s...@gmail.com> wrote:

> For information, I backported the FeedableBodyGenerator into the 1.8 branch of async-http-client:
> https://github.com/AsyncHttpClient/async-http-client/pull/734
>
> But there is no release so far.
>
> What you can do is simply copy the class into your project:
> https://github.com/AsyncHttpClient/async-http-client/blob/1.8.x/src/main/java/com/ning/http/client/providers/netty/FeedableBodyGenerator.java
>
>
> and then use it to build a streaming upload like this:
> (disclaimer: I am writing the code directly in the email. It won't probably compile nor run)

Yeah - thank you anyhow, I almost got it now, however…


> Iteratee.flatten(result.future)


expects an iteratee, but the result is just WSResponse.

Can you help with what

> val result = Promise[WSResponse]()

should actually be?

In Addition, the

> Iteratee.flatten(result.future)

seems misplaced. Should it be in the EOF-case of the Continuation?

Jan

Jan Algermissen

unread,
Oct 29, 2014, 4:29:58 PM10/29/14
to play-fr...@googlegroups.com
Yann,

see below:

On 29 Oct 2014, at 09:11, Yann Simon <yann.s...@gmail.com> wrote:

I changed this to

val f: Future[WSResponse] = result.future
Iteratee.fold1(f) { (e, a) => f }

Is that correct?

Yann Simon

unread,
Oct 30, 2014, 12:01:40 PM10/30/14
to play-framework
Hi Jan,

I took the time to write an version that compiles:

  def uploadStream(url: String, contentType: String, ws: WSClient): Iteratee[Array[Byte], WSResponse] = {

    // set the feedable body generator
    val asyncHttpClient = ws.underlying[AsyncHttpClient]
    val bodyGenerator = new FeedableBodyGenerator
    val builder = new RequestBuilder("POST")
      .setUrl(url)
      .addHeader(HeaderNames.CONTENT_TYPE, contentType)
      .setBody(bodyGenerator)

    // construct the result
    val result = Promise[WSResponse]()
    asyncHttpClient.executeRequest(builder.build(), new AsyncCompletionHandler[AHCResponse]() {
      override def onCompleted(response: AHCResponse) = {
        result.success(NingWSResponse(response))
        response
      }

      override def onThrowable(t: Throwable) = {
        result.failure(t)
      }
    })

    val bodyGeneratorIt = Iteratee.fold[Array[Byte], FeedableBodyGenerator](bodyGenerator) {
      (generator, bytes) =>
        val isLast = false
        generator.feed(ByteBuffer.wrap(bytes), isLast)
        generator
    }

    bodyGeneratorIt.mapM { generator =>
      val isLast = true
      generator.feed(emptyBuffer, isLast)
      result.future
    }
  }
Reply all
Reply to author
Forward
0 new messages