client-side flow control for streams

1,479 views
Skip to first unread message

stephen....@gmail.com

unread,
Apr 18, 2016, 11:30:47 AM4/18/16
to grpc.io
Hello,

I'm admittedly a grpc newbie, but AFAICT 0.14 snapshot has server-side streams' flow control exposed to the application, e.g. by casting a StreamObserver to CallStreamObserver, and checking isReady.

I'm just curious, how close is having this for client-side streams? E.g. to be able to cast to CallStreamObserver? Is it simple and just not done yet? Complex and so might be awhile?

(For context, I'm hacking on a lsyncd-style project that does two-way syncing, e.g. between a development tower and a programmer's laptop. I'm using grpc's BIDI streaming, which so far is really great. And right now I'm looking at how to avoid calling .onNext 1000s of times (one per new/changed file, e.g. on initial sync), before the stream has had a chance to flush previous files to the remote peer.)

Thanks!

- Stephen

Eric Anderson

unread,
Apr 18, 2016, 12:43:36 PM4/18/16
to stephen....@gmail.com, grpc.io
On Mon, Apr 18, 2016 at 8:30 AM, <stephen....@gmail.com> wrote:
I'm admittedly a grpc newbie, but AFAICT 0.14 snapshot has server-side streams' flow control exposed to the application, e.g. by casting a StreamObserver to CallStreamObserver, and checking isReady.

Yep, that sounds right.

I'm just curious, how close is having this for client-side streams? E.g. to be able to cast to CallStreamObserver? Is it simple and just not done yet? Complex and so might be awhile?

It's not quite as simple, because of races involved with setting callbacks. There's also a much easier workaround for client-side: use Channel/ClientCall.

To use Channel/ClientCall directly, instead of doing:

stub.someAction(args)

You can do:
ClientCall<Req, Resp> call = stub.getChannel().newCall(
    YourProtoGrpc.METHOD_SOME_ACTION, stub.getCallOptions());
call.start(yourListener);

// Whenever you need it
call.isReady();

You are then required to manage flow control, so you should call call.request() at some point.

Note that it is possible to use ClientCalls on the call instead of calling start() and managing things yourself. However, it doesn't really solve the problem then because if you are observing isReady() you almost certainly also be notified of onReady().

Stephen Haberman

unread,
Apr 18, 2016, 10:08:08 PM4/18/16
to Eric Anderson, grpc.io
Hi Eric,

Thanks for the reply. Using the lower level ClientCall sounds like a good option, I'll start poking around at that.

Thanks!

- Stephen

Stephen Haberman

unread,
Apr 24, 2016, 11:42:51 AM4/24/16
to Eric Anderson, grpc.io
So I naively started looking into adding this. I technically got it working by ignoring all of the threading/race conditions. :-)

However, in doing so, I see what you mean now; on the server-side, the server's method is given the responseObserver as an argument, so they can make calls on it (e.g. setOnReadyHandler) before the ServerCalls has started doing .request/etc. work.

But for the client-side, the client code is only given the requestObserver as a return argument, e.g. after ClientCalls has initiated the whole startCall/etc. workflow, so things are already in motion by the time the client might want to say "oh right, tell me when data is ready" or "disable auto flow control".

Does that sound right?

If so, it seems like a fundamental limitation of the "this looks like a method call" RPC convention, without something ugly like the client passing along with "configure request observer" function that ClientCalls could invoke before returning, e.g.:

    requestObserver = stub.someBidiMethod(responseObserver, requestObserver -> { // call .setOnReadyHandler here });

...

Tangential question, but does Stream.request(numMessages) result in a wire call?

I had thought it would, in that the javadocs said "requests up given number of messages to be delivered", I had thought "requests" == "ask the server", e.g. the server would say "I can send one message, okay sent it, now wait for the client to ack back (via request(N)) that it's ready for more". (And vice versa on the client.)

So basically there would only ever be one message (or numMessages) in flight at a time.

However, AFAICT, calling request(numMessages) just ends in incrementing MessageDeframer.pendingDeliveries, e.g. the server-side could be actively sending more messages, and they'll sit in some buffers on the client-side, until MessageDeframer calls processBody to drain it.

Does that sound right? 

Granted, having only one request/response message inflight at a time did seem less-than-ideal, but isn't some sort of "client calling back to the server"/vice versa what flow control is supposed to do?

(FWIW I did try to RTFM. :-))

Thanks!

- Stephen





Eric Anderson

unread,
Apr 26, 2016, 8:19:48 PM4/26/16
to Stephen Haberman, grpc.io
On Sun, Apr 24, 2016 at 8:42 AM, Stephen Haberman <stephen....@gmail.com> wrote:
But for the client-side, the client code is only given the requestObserver as a return argument, e.g. after ClientCalls has initiated the whole startCall/etc. workflow, so things are already in motion by the time the client might want to say "oh right, tell me when data is ready" or "disable auto flow control".

Does that sound right?

Yep.

If so, it seems like a fundamental limitation of the "this looks like a method call" RPC convention, without something ugly like the client passing along with "configure request observer" function that ClientCalls could invoke before returning, e.g.:

    requestObserver = stub.someBidiMethod(responseObserver, requestObserver -> { // call .setOnReadyHandler here });

Well, the only real way to solve it is to split out a start() method, like we've done for ClientCall.

Note that things are even more tricky than they seem, in that there is a race with if the receiver sends.

requestObserver = stub.doMethod(responseObserver);

If responseObserver needs to send (say, in response to onReady, if it were added), then it needs access to requestObserver. To fix the race you must have the object-that-you-send-on before the call is started.

Tangential question, but does Stream.request(numMessages) result in a wire call?

Yes, but not 1:1. There is a conversion between byte-based flow control and message-based flow control. Basically, we only send WINDOW_UPDATES in HTTP/2 when there are outstanding requests.

I had thought it would, in that the javadocs said "requests up given number of messages to be delivered", I had thought "requests" == "ask the server", e.g. the server would say "I can send one message, okay sent it, now wait for the client to ack back (via request(N)) that it's ready for more". (And vice versa on the client.)

No, it's not that precise, or requiring that many round trips. Basically, the client can always assume the server is sending more, so don't bother asking the client. The client can just inform the server when it's willing to receive more.

In HTTP/2 INITIAL_WINDOW_SIZE means that the server is able to send some bytes initially (64k by default). If we are really low on memory we may do something more precise (like only allow 5 bytes initially, to know how large the response is), but that is still being designed (by myself and ctiller).

As an aside, flow control is identical between server-side and client-side, with the exception that there is additional flow control on the client for creating RPCs (MAX_CONCURRENT_STREAMS).

So basically there would only ever be one message (or numMessages) in flight at a time.

We don't do that today. This is something we can fall back to if under memory pressure (and that we're designing).

However, AFAICT, calling request(numMessages) just ends in incrementing MessageDeframer.pendingDeliveries, e.g. the server-side could be actively sending more messages, and they'll sit in some buffers on the client-side, until MessageDeframer calls processBody to drain it.

Does that sound right?

Yep. This works really well for reducing latency, at the expense of memory.

Granted, having only one request/response message inflight at a time did seem less-than-ideal, but isn't some sort of "client calling back to the server"/vice versa what flow control is supposed to do?

In the form you mentioned easier it favors large requests. For speed you still need the client to be able to request more than once at a time.

The method we're using here is very similar to TCP. It uses a buffer in order to reduce the latency involved. Once the buffer fills then the application receives push-back.

We've talked some about doing precise, per-message flow control on the wire. We're sort of split on it. We were worried about not having it earlier, but at this point we've made it work pretty well. There are still some use cases that can suffer (due to buffers in intermediaries and infinite streams), but there are other ways to address that.

(FWIW I did try to RTFM. :-))

Ha! There will be a written test for you to take to prove it. A very short test.

Flow control can be represented in a lot of different ways, and we actually use quite a few of them between all the different gRPC languages. Flow control is part of gRPC, but the API for it is entirely language-specific. It only needs to push back on the sender. If using an async API for reading (like Java), then you need to also have a way to request more work.

We don't have any good documentation to say how it works on the wire. You have to read the HTTP/2 specification and our implementation for that, at present.

Stephen Haberman

unread,
May 8, 2016, 8:18:50 PM5/8/16
to Eric Anderson, grpc.io
Hi Eric,

Sorry for the late response; I wanted to read your message a few times.

Thanks for explaining the byte- vs. message-based flow control; seeing request(message) in the API made me jump to the conclusion it was 100% message-based, and I didn't think of it being a mix of both, which makes more sense.

I really appreciate your time, so will only ask what is hopefully one more simple/quick clarification:

The method we're using here is very similar to TCP. It uses a buffer in order to reduce the
> latency involved. Once the buffer fills then the application receives push-back.

Just for clarity, I assume you mean "push-back" as in call.isReady now returns false?

I was hoping, when I first read it, that you meant "push-back" as in "onNext blocks" (for bidi streams), because that is exactly what I'd like.

But given issue #1216, scanning the source, etc., I'm pretty convinced onNext does not block when buffers fill up.

So right now I'm naively trying to code a BlockingStreamObserver to wrap the client's request observer and server's response observer, and check the wrapped call/CallStreamObserver isReady and block/self-throttle inside of onNext, and it seems like more work than I should have to do...

It would be great if I'm just making this too hard. :-)

Thanks,
Stephen

Eric Anderson

unread,
May 9, 2016, 5:18:25 PM5/9/16
to Stephen Haberman, grpc.io
On Sun, May 8, 2016 at 5:18 PM, Stephen Haberman <stephen....@gmail.com> wrote:
The method we're using here is very similar to TCP. It uses a buffer in order to reduce the
> latency involved. Once the buffer fills then the application receives push-back.

Just for clarity, I assume you mean "push-back" as in call.isReady now returns false?

Yes. I was speaking in a more general sense than actual APIs, but yes.

I was hoping, when I first read it, that you meant "push-back" as in "onNext blocks" (for bidi streams), because that is exactly what I'd like.

That's very possible, but we don't do it. Since it is supposed to be an async API, blocking "randomly" based on the remote endpoint seemed dangerous without it being obvious it may happen.

Yes, now we have the problem of missing flow control, but they argument posed is that many applications don't need outbound flow control and those that do can use ClientCall.

But given issue #1216, scanning the source, etc., I'm pretty convinced onNext does not block when buffers fill up.

So right now I'm naively trying to code a BlockingStreamObserver to wrap the client's request observer and server's response observer, and check the wrapped call/CallStreamObserver isReady and block/self-throttle inside of onNext, and it seems like more work than I should have to do...

It would be great if I'm just making this too hard. :-)

No, it would be necessary.

It might not be too hard to have an option to enable the blocking behavior and we add that complexity to ClientCalls/ServerCalls, but it would probably still be off by default.

Stephen Haberman

unread,
May 11, 2016, 10:10:37 AM5/11/16
to Eric Anderson, grpc.io

Since it is supposed to be an async API, blocking "randomly" based on the
> remote endpoint seemed dangerous without it being obvious it may happen.

Makes sense; I saw that rationale mentioned in one of the issues.

My only $0.02 would be what is more surprising: having your async call block due to back-pressure, or having your JVM OOME because the queue/buffers are filled up? Admittedly, neither seems great.

> It might not be too hard to have an option to enable the blocking behavior
> and we add that complexity to ClientCalls/ServerCalls, but it would probably
> still be off by default.

Cool, that makes sense. I'd personally find that useful.

I had been hacking around on a probably-wrong approach of adding CallStreamObserver to ClientCalls, but am not sure when/if I'll have a chance to turn it into a proper PR. It is pretty fun, but I'm behind on other things.

Thanks for your time!

- Stephen



Eric Anderson

unread,
May 11, 2016, 3:52:30 PM5/11/16
to Stephen Haberman, grpc.io
On Wed, May 11, 2016 at 7:10 AM, Stephen Haberman <stephen....@gmail.com> wrote:

Since it is supposed to be an async API, blocking "randomly" based on the
> remote endpoint seemed dangerous without it being obvious it may happen.

Makes sense; I saw that rationale mentioned in one of the issues.

My only $0.02 would be what is more surprising: having your async call block due to back-pressure, or having your JVM OOME because the queue/buffers are filled up? Admittedly, neither seems great.

We're going to try to spend some more time on this in https://github.com/grpc/grpc-java/issues/1549 
Reply all
Reply to author
Forward
0 new messages