Hi everyone!
I have implemented a reverse proxy like so:
val filteredResponseHeaders = Seq(CONTENT_LENGTH, CONTENT_TYPE)
def stream: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
Accumulator.source[ByteString].map(Right.apply)
}
def connect(path: String) = Action.async(stream) { request =>
val upstreamQueryString = request.queryString.flattenValues
val upstreamHeaders = request.headers.headers
val baseClient =
ws.url(upstreamURL)
.withFollowRedirects(false)
.withMethod(request.method)
.withRequestTimeout(Duration.Inf)
.withQueryStringParameters(upstreamQueryString: _*)
.withHttpHeaders(upstreamHeaders: _*)
.withBody(SourceBody(request.body))
baseClient.stream.map { response =>
val receivedHeaders = response.headers.mapValues(_.head)
val responseHeaders = ResponseHeader(response.status, receivedHeaders.filterNot { case (k, _) => filteredResponseHeaders.contains(k) })
Result(responseHeaders, HttpEntity.Streamed(response.bodyAsSource, receivedHeaders.get(CONTENT_LENGTH).map(_.toLong),
receivedHeaders.get(CONTENT_TYPE)))
}.withOnFailure { t =>
Logger.error("Error proxying request.", t);
}
}This works fine most of the time. However, at completely random intervals I get the following error and streaming for that request is interrupted:
[warn] [ncHttpClient-2-7] [play.shaded.ahc.org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody] Stream has already been consumed and cannot be reset
(no further stack trace, just that).
Can anyone please help me detect what I'm doing wrong?
Thank you!