Play 2.5 Broadcast a single Source[ByteString] to two consumers of Source[ByteString]?

482 views
Skip to first unread message

Doug Roper

unread,
Apr 13, 2017, 8:02:43 AM4/13/17
to Play Framework

I have a proxy-like controller that calls wsClient.url("...").stream(). I'd like to stream the same ws response body:

  1. from the play action: Result(..., Streamed(source))
  2. to analytics team's REST server: wsClient.url("...").post(StreamedBody(source))
The following example, naively sharing the same Source instance, more fully illustrates my goal:

def tee: Action[AnyContent] = Action.async {
// We're going to do a GET and feed the response Source[ByteString] to two places.
wsClient.url(config.baseUrl)
.stream()
.map { case StreamedResponse(headers, source) =>
val simpleHeaders = headers.headers.mapValues(_.head)
val contentLength = simpleHeaders(CONTENT_LENGTH)
val contentType = simpleHeaders(CONTENT_TYPE)

// Stream it as the POST request body of another request.
wsClient.url(s"${config.baseUrl}/log/response/body")
.withMethod("POST")
.withHeaders(CONTENT_LENGTH -> contentLength, CONTENT_TYPE -> contentType)
.withBody(StreamedBody(source)) // (1)
.execute()

// Stream it to the caller in the Result.
Result(
header = ResponseHeader(
status = headers.status,
headers = simpleHeaders.filterKeys(!_.startsWith("Content-")) // shush the NettyModelConversion warning.
),
body = HttpEntity.Streamed(
data = source, // (2)
contentLength = Some(contentLength.toLong),
contentType = Some(contentType)
)
)
}
}

This very clearly doesn't work, and throws:
java.lang.IllegalStateException: This publisher only supports one subscriber
at com.typesafe.netty.HandlerPublisher.subscribe(HandlerPublisher.java:167)

And yet, HttpEntity.Streamed and StreamedBody both require a Source[ByteString], so I have no wiggle room there.

So then, how to broadcast the Source[ByteString] to two Source[ByteString] instances?

GraphDSL + Broadcast[ByteString] seems like it should have a solution, but we only have Outlets to work with inside of GraphDSL.create(), and I don't see how to easily pass out multiple Sources using a SourceShape.

Sink.asPublisher(fanout = true) also looked promising, but seems to introduce a race condition:

val publisher = body.runWith(Sink.asPublisher(fanout = true)
val source1 = Source.fromPublisher(publisher)
val source2 = Source.fromPublisher(publisher)
... // ws.post(source1)
... // Result(..., source2) 

Creating the Source doesn't .subscribe() it to the publisher immediately. The sink of source1 can consume the whole publisher's content before source2 even subscribes. source2 would not get the full response, if any.

Found a relevant stack overflow question, but unanswered: http://stackoverflow.com/questions/38438983/split-akka-stream-source-into-two

I've pushed up a minimal project with failing tests: https://github.com/htmldoug/play-example-broadcast-source

There must be a trick that I'm missing. Would appreciate any guidance.

Thanks,
--Doug

Christian Schmitt

unread,
Apr 13, 2017, 9:11:07 AM4/13/17
to Play Framework
you can use the broadcast hub later introduced in akka 2.4: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-dynamic.html#Using_the_BroadcastHub

  1. val producer = source.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right).run()
now you can put the producer into ws.post and Result, however you should also buffer/backpressureTimeout, best thing is just read the help page ;)

Will Sargent

unread,
Apr 13, 2017, 2:14:11 PM4/13/17
to play-fr...@googlegroups.com

--
Will Sargent
Engineer, Lightbend, Inc.


--
You received this message because you are subscribed to the Google Groups "Play Framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framework+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/6edc54f0-1048-41b4-9f20-0d0b5ef08ddc%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Doug Roper

unread,
Apr 14, 2017, 6:46:28 AM4/14/17
to Play Framework
Thanks for the responses. That does indeed give me two Sources, though BroadcastHub seems to have the same race condition as the Sink.asPublisher(fanout = true) approach.

The first subscriber can drain both the Source and BroadcastHub before the second one ever subscribes. The thread scheduler is cruel mistress. I can't sacrifice correctness and drop ByteStrings.


val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 8))(Keep.right).run()

producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // consumes all elements
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // before this one subscribes
// failing test 1 

Is there some other building block I can use to prevent this from happening?

Only slightly related, but setting the BroadcastHub.sink(bufferSize = larger than the number of source elements) causes no elements to be returned. I'm very curious why that's the case.

val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // returns Vector() instead.
// failing test 2 

--Doug
To unsubscribe from this group and stop receiving emails from it, send an email to play-framewor...@googlegroups.com.

Martynas Mickevičius

unread,
Apr 17, 2017, 4:45:37 AM4/17/17
to Play Framework
Hi Doug,


On Friday, 14 April 2017 13:46:28 UTC+3, Doug Roper wrote:
Thanks for the responses. That does indeed give me two Sources, though BroadcastHub seems to have the same race condition as the Sink.asPublisher(fanout = true) approach.

The first subscriber can drain both the Source and BroadcastHub before the second one ever subscribes. The thread scheduler is cruel mistress. I can't sacrifice correctness and drop ByteStrings.
 


val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 8))(Keep.right).run()

producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // consumes all elements
producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // before this one subscribes
// failing test 1 

Is there some other building block I can use to prevent this from happening?

I can not think of anything that would be simple to implement and would handle all of the different timings of the stream. I think it would be best to improve broadcast hub with something that can hint the minimum number of sinks to be expected before starting up the broadcast.
 

Only slightly related, but setting the BroadcastHub.sink(bufferSize = larger than the number of source elements) causes no elements to be returned. I'm very curious why that's the case.

val source = Source(1 to 10)
val producer = source.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()

producer.runWith(Sink.seq).futureValue should contain theSameElementsInOrderAs (1 to 10) // returns Vector() instead.
// failing test 2 

This might be a bug (or an unintuitive behaviour) in Broadcast hub. But what happens here is that hub consumes all elements and gets a completion to the upstream. Then when new sink subscribes, hub has already got a completion from the upstream, and signals complete to the newly subscribed sink.

Doug Roper

unread,
Apr 18, 2017, 9:22:47 PM4/18/17
to Play Framework
Martynas,

Cool. I appreciate the confirmation that I'm not overlooking an easy solution. I feel better about looking into more complicated ones now.

Adding a size hint to BroadcastHub seems like it'd do the trick. Unfortunately, making that change is beyond my current comfort level with akka streams.

For now, I'm just buffering the whole thing into memory. :( It negates all benefits of streaming, but at least it doesn't drop bytes.

val futureBodyBytes = body.runWith(Sink.reduce[ByteString](_ ++ _))
def source = Source.fromFuture(futureBodyBytes)

Result(..., Streamed(source))
ws.post(StreamedBody(source))

I'm attempting to convince my org to upgrade from Play 2.3 to Play 2.5. Would have been nice to showcase the streaming as a selling point.

If someone comes up with a solution, I'll happily adopt it.

Thanks
--Doug

Will Sargent

unread,
Oct 14, 2017, 11:00:14 PM10/14/17
to Play Framework
Reply all
Reply to author
Forward
0 new messages