bidi streaming RPC auto flow control and thread model

183 views
Skip to first unread message

y

unread,
Dec 16, 2022, 7:23:58 PM12/16/22
to grpc.io
Hi, 

I am trying to use bidi streaming RPC for multiple large chunks of data. 

I was wondering how to flow control is done for bidi streaming java. 

If I want to apply backpressure from server side, can I just block on onNext() call? My understanding is this should be a grpc executor task I shouldn't block. 

Does this mean I have to do the manual control flow to apply back pressure and there is no default option? 

Thanks! 

sanjay...@google.com

unread,
Dec 18, 2022, 7:11:59 PM12/18/22
to grpc.io
Take a look at https://github.com/grpc/grpc-java/tree/master/examples/src/main/java/io/grpc/examples/manualflowcontrol and disableAutoRequest() for manual flow control.

I think it is okay to block on onNext() call for automatic flow control. Check this https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html#disableAutoInboundFlowControl--  which says "automatic flow control where a token is returned to the peer after a call to the 'inbound' StreamObserver.onNext(Object) has completed."

y

unread,
Dec 19, 2022, 12:34:44 AM12/19/22
to grpc.io
Thanks for answering the question! 

I have one concern on blocking the onNext call.  
I assume blocking onNext call blocks grpc executor pool. 

Say I have a high QPS for streaming RPC, and one rpc request sends a lot more streams than the others, will this starve the grpc executor pool and block the other RPCs as well? 

Yes. I did see the manual flow control. Can you comment on the pros and cons of using manual vs automatic? 

Thanks again for helping out! 

Sanjay Pujare

unread,
Dec 19, 2022, 2:32:24 AM12/19/22
to y, grpc.io
On Sun, Dec 18, 2022 at 9:34 PM y <liuya...@gmail.com> wrote:
Thanks for answering the question! 

I have one concern on blocking the onNext call.  
I assume blocking onNext call blocks grpc executor pool. 

Say I have a high QPS for streaming RPC, and one rpc request sends a lot more streams than the others, will this starve the grpc executor pool and block the other RPCs as well? 

Yes, that one blocking onNext() will use one thread which won't be available for other RPCs.
 

Yes. I did see the manual flow control. Can you comment on the pros and cons of using manual vs automatic? 

Automatic being automatic you don't have to do much but has the potential problem of pool starvation but of course the blocked RPC anyway does not need a thread.

Manual flow control gives you more control (using serverCallStreamObserver.request(1) to request next etc) and you can potentially use your thread to do your work so onNext() can return and pool available for other RPCs.
 

Thanks again for helping out! 



On Sunday, December 18, 2022 at 5:11:59 PM UTC-7 sanjay...@google.com wrote:
Take a look at https://github.com/grpc/grpc-java/tree/master/examples/src/main/java/io/grpc/examples/manualflowcontrol and disableAutoRequest() for manual flow control.

I think it is okay to block on onNext() call for automatic flow control. Check this https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html#disableAutoInboundFlowControl--  which says "automatic flow control where a token is returned to the peer after a call to the 'inbound' StreamObserver.onNext(Object) has completed."

On Friday, December 16, 2022 at 4:23:58 PM UTC-8 liuya...@gmail.com wrote:
Hi, 

I am trying to use bidi streaming RPC for multiple large chunks of data. 

I was wondering how to flow control is done for bidi streaming java. 

If I want to apply backpressure from server side, can I just block on onNext() call? My understanding is this should be a grpc executor task I shouldn't block. 

Does this mean I have to do the manual control flow to apply back pressure and there is no default option? 

Thanks! 

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/bb149d94-de2f-4b79-b641-b4c03c3ae7fcn%40googlegroups.com.

y

unread,
Dec 20, 2022, 2:56:13 PM12/20/22
to Sanjay Pujare, grpc.io
Thanks for the help! 

Some extra questions. 

Wondering if there is limited token by default for auto flow control? 

If not, say I have 20 threads, one rpc with 20 streams with blocking onNext()  will block 20 threads? 

sanjay...@google.com

unread,
Dec 22, 2022, 2:14:59 PM12/22/22
to grpc.io

Wondering if there is limited token by default for auto flow control? 

I guess there has to be a limit for backpressure to work. Are you asking if it's possible to change the token limit? 
 

If not, say I have 20 threads, one rpc with 20 streams with blocking onNext()  will block 20 threads? 

One rpc is one stream. Unless you are saying there are 20 incoming rpc instances of the same method in which case there will be 20 streams. You are right: it will take up 20 threads.
 

y

unread,
Dec 22, 2022, 3:33:17 PM12/22/22
to sanjay...@google.com, grpc.io
On Thu, Dec 22, 2022 at 11:15 AM 'sanjay...@google.com' via grpc.io <grp...@googlegroups.com> wrote:

Wondering if there is limited token by default for auto flow control? 

I guess there has to be a limit for backpressure to work. Are you asking if it's possible to change the token limit? 
Yeah. I am asking what the limit is by default. If I want to change it, I have to do manual flow control I suppose? 
 
 

If not, say I have 20 threads, one rpc with 20 streams with blocking onNext()  will block 20 threads? 

One rpc is one stream. Unless you are saying there are 20 incoming rpc instances of the same method in which case there will be 20 streams. You are right: it will take up 20 threads.
 
Sorry for the confusion. I meant for StreamingRPC. Say one rpc client (one rpc method) calls onNext() 10 times. and each onNext() for the rpc server side is blocking. How many threads will this rpc block? 

To be more specific with pseudo code.

Service Proto: 

service RouteGuide {
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

Server side:

public class RouteGuideServer {
    public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver){
             return new StreamObserver<RouteNote>(){
                   @Override
                   public void onNext(RouteNote note) {
                           // Blocking the thread here
                   }
                  ....
             }
    } 
}

Client side:
 
RouteGuideStub asyncStub = RouteGuideGrpc.newStub(channel);
StreamObserver<RouteNote> requestObserver = asyncStub.routeChat(new StreamObserver<RouteNote>() {. => called once
............
});

for (int i = 0; i < 20; ++i){
        requestObserver.onNext(request); => called 20 times

How many threads will this block? 

I guess I can do some testing in code as well on my end. Let me do that as well. :) 

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

sanjay...@google.com

unread,
Dec 22, 2022, 4:14:48 PM12/22/22
to grpc.io

Wondering if there is limited token by default for auto flow control? 

I guess there has to be a limit for backpressure to work. Are you asking if it's possible to change the token limit? 
Yeah. I am asking what the limit is by default. If I want to change it, I have to do manual flow control I suppose? 

It is 1 : it asks for one more request-message after receiving a message:  see https://github.com/grpc/grpc-java/blob/master/stub/src/main/java/io/grpc/stub/ServerCalls.java#L266 

 
 

If not, say I have 20 threads, one rpc with 20 streams with blocking onNext()  will block 20 threads? 

One rpc is one stream. Unless you are saying there are 20 incoming rpc instances of the same method in which case there will be 20 streams. You are right: it will take up 20 threads.
 
Sorry for the confusion. I meant for StreamingRPC. Say one rpc client (one rpc method) calls onNext() 10 times. and each onNext() for the rpc server side is blocking. How many threads will this rpc block? 

All messages in a streaming RPC are strictly sequential so it will take only one thread on the server side. Unless a message is completely processed the next message in the stream is not accepted for processing.

 

y

unread,
Dec 22, 2022, 4:47:57 PM12/22/22
to sanjay...@google.com, grpc.io
Sorry for one more follow up. 😅

Is the grpc executor thread pool shared across different rpc services.

Say I have 

service a {
  rpc test()
}

service b{
  rpc test()
}

If service a and service b are implemented on the same server, do they share the same thread pool? 

--
You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/7LV3MC9Dwkg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/d0de588d-ef5b-4fad-a735-5d513e4abaddn%40googlegroups.com.

Sanjay Pujare

unread,
Dec 22, 2022, 5:04:15 PM12/22/22
to y, grpc.io
On Thu, Dec 22, 2022 at 1:47 PM y <liuya...@gmail.com> wrote:


If service a and service b are implemented on the same server, do they share the same thread pool? 

Yes. But you can provide a threadpool/executor per server : ServerBuilder.executor()

y

unread,
Dec 22, 2022, 5:30:00 PM12/22/22
to Sanjay Pujare, grpc.io
On Thu, Dec 22, 2022 at 2:04 PM Sanjay Pujare <sanjay...@google.com> wrote:


On Thu, Dec 22, 2022 at 1:47 PM y <liuya...@gmail.com> wrote:


If service a and service b are implemented on the same server, do they share the same thread pool? 

Yes. But you can provide a threadpool/executor per server : ServerBuilder.executor()

I see. This means if I create two grpc servers in the same physical server, they can have different thread pools?  

Sanjay Pujare

unread,
Dec 22, 2022, 5:34:17 PM12/22/22
to y, grpc.io
Yes, if you call ServerBuilder.executor()
Reply all
Reply to author
Forward
0 new messages