Java -server -> client blocking queue?

270 views
Skip to first unread message

good...@gmail.com

unread,
Oct 22, 2016, 5:41:04 PM10/22/16
to grpc.io
Hi,

I'm messing around with a Java server/client impl with grpc and would like to have the Server stream back chunks from an InputStream. The chunking using a bytes type seems fine, but now I'm wondering if there's a way to do some sort of blocking-queue, so that if the client slows down reading, the server will slow down too (I'm getting "java.lang.OutOfMemoryError: Direct buffer memory" in the server) -- Is this possible?

Thanks!

good...@gmail.com

unread,
Oct 23, 2016, 11:21:35 AM10/23/16
to grpc.io, good...@gmail.com
After digging a little, it seems there is support for "flow control", although I can't seem to find any examples of how to do this. I've seen references to ClientResponseObserver, but not finding any way to integrate this either. Any pointers on how to work with gRPC's flow control - more importantly, am I on the right track here?

- Matt

 I just need to make sure that if the client isn't consuming quickly enough, that the server doesn't continue to push data into its buffer and then run out of memory.

Avinash Dongre

unread,
Oct 25, 2016, 1:55:29 AM10/25/16
to grpc.io, good...@gmail.com

Matt Mitchell

unread,
Oct 28, 2016, 11:32:04 AM10/28/16
to grpc.io, good...@gmail.com
Thanks Avinash. I tried your example and it does seem to help. So if I'm emitting a stream from my server, should I be checking isReady() before each onNext() call? I'm currently reading an InputStream and chunking out to the response, so really what I'm doing is "waiting" (while/sleep) for isReady to be true on each read from the InputStream. This does seem to prevent the server throwing an OOM but not sure if this is the best way?

I really wish there were a way to do a blocking stream write/read so the backpressure handling was driven by the client reads. Having to wait/sleep or react to asynchronous messages in the server (for example, client could send a "pause" message, server could "sleep") feels complicated and brittle. Am I missing something with how I'm approaching this?

Eric Anderson

unread,
Oct 28, 2016, 6:11:47 PM10/28/16
to Matt Mitchell, grpc.io
You can check out my "example" in issue 2247.

On Fri, Oct 28, 2016 at 8:32 AM, Matt Mitchell <good...@gmail.com> wrote:
So if I'm emitting a stream from my server, should I be checking isReady() before each onNext() call?

You should check it occasionally; it doesn't have to be done for every message. I'd suggest checking it _after_ onNext(); if you already have a message, go ahead and send it; just delay creating more messages.

Java's flow control is advisory: gRPC tells you when it is no longer productive to queue more messages and you stop when convenient. But anything you queue will be queued, so you have to be aware of how much memory you may be wasting.

I'm currently reading an InputStream and chunking out to the response, so really what I'm doing is "waiting" (while/sleep) for isReady to be true on each read from the InputStream.

That's fair, although we provide a callback like I showed in the link above. Again, I'd suggest calling onNext() first and then if isReady() is false, just delay calling read().

I really wish there were a way to do a blocking stream write/read so the backpressure handling was driven by the client reads.

Which would be a blocking API, which Stream Observer is not. We would like to have blocking APIs server-side, but it wasn't clear what the "idiomatic" Java API would be. Streaming can also be hard (especially bi-di) because it is easy for you to deadlock if you aren't careful.

Having to wait/sleep or react to asynchronous messages in the server (for example, client could send a "pause" message, server could "sleep") feels complicated and brittle.

The setOnReadyHandler makes it reasonable. It is still an async API, which has its advantages and downfalls.
Reply all
Reply to author
Forward
0 new messages