I have a proxy-like controller that calls wsClient.url("...").stream(). I'd like to stream the same ws response body:
wsClient.url("...").post(StreamedBody(source))
def tee: Action[AnyContent] = Action.async {
// We're going to do a GET and feed the response Source[ByteString] to two places.
wsClient.url(config.baseUrl)
.stream()
.map { case StreamedResponse(headers, source) =>
val simpleHeaders = headers.headers.mapValues(_.head)
val contentLength = simpleHeaders(CONTENT_LENGTH)
val contentType = simpleHeaders(CONTENT_TYPE)
// Stream it as the POST request body of another request.
wsClient.url(s"${config.baseUrl}/log/response/body")
.withMethod("POST")
.withHeaders(CONTENT_LENGTH -> contentLength, CONTENT_TYPE -> contentType)
.withBody(StreamedBody(source)) // (1)
.execute()
// Stream it to the caller in the Result.
Result(
header = ResponseHeader(
status = headers.status,
headers = simpleHeaders.filterKeys(!_.startsWith("Content-")) // shush the NettyModelConversion warning.
),
body = HttpEntity.Streamed(
data = source, // (2)
contentLength = Some(contentLength.toLong),
contentType = Some(contentType)
)
)
}
}
java.lang.IllegalStateException: This publisher only supports one subscriber
at com.typesafe.netty.HandlerPublisher.subscribe(HandlerPublisher.java:167)
val publisher = body.runWith(Sink.asPublisher(fanout = true)
val source1 = Source.fromPublisher(publisher)
val source2 = Source.fromPublisher(publisher)
... // ws.post(source1)
... // Result(..., source2)
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/6edc54f0-1048-41b4-9f20-0d0b5ef08ddc%40googlegroups.com.--
You received this message because you are subscribed to the Google Groups "Play Framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framework+unsubscribe@googlegroups.com.
val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 8))(Keep.right).run()
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // consumes all elements
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // before this one subscribes
// failing test 1
val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // returns Vector() instead.
// failing test 2
To unsubscribe from this group and stop receiving emails from it, send an email to play-framewor...@googlegroups.com.
Thanks for the responses. That does indeed give me two Sources, though BroadcastHub seems to have the same race condition as the Sink.asPublisher(fanout = true) approach.The first subscriber can drain both the Source and BroadcastHub before the second one ever subscribes. The thread scheduler is cruel mistress. I can't sacrifice correctness and drop ByteStrings.
I've created a failing test for it here: https://github.com/htmldoug/play-example-broadcast-source/blob/master/test/controllers/BroadcastHubSpec.scalaval source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 8))(Keep.right).run()
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // consumes all elements
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // before this one subscribes// failing test 1Is there some other building block I can use to prevent this from happening?
Only slightly related, but setting the BroadcastHub.sink(bufferSize = larger than the number of source elements) causes no elements to be returned. I'm very curious why that's the case.val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // returns Vector() instead.// failing test 2
val futureBodyBytes = body.runWith(Sink.reduce[ByteString](_ ++ _))def source = Source.fromFuture(futureBodyBytes)
Result(..., Streamed(source))ws.post(StreamedBody(source))