Random "Stream has already been consumed and cannot be reset" in reverse proxy implemented with Play 2.6.7

188 views
Skip to first unread message

Raul Piaggio

unread,
Dec 8, 2017, 2:53:33 PM12/8/17
to Play Framework
Hi everyone!


I have implemented a reverse proxy like so:

  val filteredResponseHeaders = Seq(CONTENT_LENGTH, CONTENT_TYPE)

  def stream: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
Accumulator.source[ByteString].map(Right.apply)
}

def connect(path: String) = Action.async(stream) { request =>
val upstreamQueryString = request.queryString.flattenValues
val upstreamHeaders = request.headers.headers

val baseClient =
ws.url(upstreamURL)
.withFollowRedirects(false)
.withMethod(request.method)
.withRequestTimeout(Duration.Inf)
.withQueryStringParameters(upstreamQueryString: _*)
.withHttpHeaders(upstreamHeaders: _*)
.withBody(SourceBody(request.body))

baseClient.stream.map { response =>
val receivedHeaders = response.headers.mapValues(_.head)
val responseHeaders = ResponseHeader(response.status, receivedHeaders.filterNot { case (k, _) => filteredResponseHeaders.contains(k) })

Result(responseHeaders, HttpEntity.Streamed(response.bodyAsSource, receivedHeaders.get(CONTENT_LENGTH).map(_.toLong),
receivedHeaders.get(CONTENT_TYPE)))
}.withOnFailure { t =>
Logger.error("Error proxying request.", t);
}
}


This works fine most of the time. However, at completely random intervals I get the following error and streaming for that request is interrupted:

[warn] [ncHttpClient-2-7] [play.shaded.ahc.org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody] Stream has already been consumed and cannot be reset

(no further stack trace, just that).

Can anyone please help me detect what I'm doing wrong? 

Thank you!
Reply all
Reply to author
Forward
0 new messages