Limiting number of pendingStreams (maxConcurrentCallsPerConnection)

943 views
Skip to first unread message

R A

unread,
Jun 8, 2017, 5:35:43 PM6/8/17
to grpc.io

Hello,

 

I am using a custom ServerProvider in which I am limiting the number of concurrent calls per connection:

NettyServerBuilder.forPort(port).maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)….

 

On the client side, grpc seems to wire in StreamBufferingEncoder which starts buffering new streams once maxConcurrentCallsPerConnection limit is reached, with no limit as far as I can see.  My questions are:

1.       Is there a way to inject custom enoder, instead, such that I can limit the number of buffered/pending streams?  Or is there a way to get a handle to StreamBufferingEncoder object such that numBufferedStreams() can be used to monitor the number of buffered stream?

2.       If the above is not possible is there any way to enforce max concurrent streams, such that new streams beyond that limit are rejected and not buffered?

 

Any other solution that may enable me to limit the number of buffered stream will be helpful as well.


Thanks.

Carl Mastrangelo

unread,
Jun 9, 2017, 6:08:42 PM6/9/17
to grpc.io
I don't know if this works today, but we could make it:  IMO the correct way to check this is to call isReady or setOnReadyHandler from the CallStreamObserver.  This is normally for flow control, but connection limiting seems similar enough.

Actually looking at the code that should do the right thing (assuming you are using NettyChannelBuilder) since it checks if the stream has yet been allocated.

R A

unread,
Jun 13, 2017, 6:50:29 PM6/13/17
to grpc.io
Thanks for the response.  CallStreamObserver (ClientCallStreamObserver), as you pointed out, is for flow control when a stream is already created.  I am trying to limit creation (or buffering) of new streams when maxConcurrentCallsPerConnection is reached, on an existing connection.  Can you elaborate the connection between the two as it wasn't very evident to me from your response.

Eric Anderson

unread,
Jun 20, 2017, 1:03:59 PM6/20/17
to R A, grpc.io
For unary calls, I don't think there is a pre-existing solution for you. Streaming RPCs that observe flow control (isReady()) will naturally avoid sending (except for headers) when MAX_CONCURRENT_STREAMS is reached. But unary calls already have their request at the time of the call, and in neither case would you know if/when it is appropriate to kill the RPC instead of waiting for RPC readiness.

We've thought about having a way for flow-control-aware unary RPCs in the past, but that still wouldn't provide a direct way to know when to kill RPCs. There are times when MAX_CONCURRENT_STREAMS may be 0 temporarily as the server tries to avoid OOM and we'd want to avoid that causing an error.

Could you explain a bit more about your use case and why you want to kill the RPCs? Are the RPCs long-lived and thus new RPCs never get started? Or are you trying to avoid the latency?



--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/64f9a98c-1e60-4664-bc12-d44276b85be1%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

R A

unread,
Jun 20, 2017, 9:16:22 PM6/20/17
to grpc.io, rua...@ebay.com

To clarify, I use unary calls.  I have a client that connects to multiple backend servers.  If any of these servers slow down, the requests will start buffering in StreamBufferingEncoder indefinitely (once beyond MAX_CONCURRENT_STREAMS).  I want to be able to identify such a scenario and mark the downstream server unavailable (temporarily) similar to a circuit breaker logic, if greater than X number of streams are buffered for that particular server.  That way a slowness in one server doesn’t affect requests being sent to other servers in the client.



On Tuesday, June 20, 2017 at 10:03:59 AM UTC-7, Eric Anderson wrote:
For unary calls, I don't think there is a pre-existing solution for you. Streaming RPCs that observe flow control (isReady()) will naturally avoid sending (except for headers) when MAX_CONCURRENT_STREAMS is reached. But unary calls already have their request at the time of the call, and in neither case would you know if/when it is appropriate to kill the RPC instead of waiting for RPC readiness.

We've thought about having a way for flow-control-aware unary RPCs in the past, but that still wouldn't provide a direct way to know when to kill RPCs. There are times when MAX_CONCURRENT_STREAMS may be 0 temporarily as the server tries to avoid OOM and we'd want to avoid that causing an error.

Could you explain a bit more about your use case and why you want to kill the RPCs? Are the RPCs long-lived and thus new RPCs never get started? Or are you trying to avoid the latency?


On Tue, Jun 13, 2017 at 3:50 PM, R A <rua...@ebay.com> wrote:
Thanks for the response.  CallStreamObserver (ClientCallStreamObserver), as you pointed out, is for flow control when a stream is already created.  I am trying to limit creation (or buffering) of new streams when maxConcurrentCallsPerConnection is reached, on an existing connection.  Can you elaborate the connection between the two as it wasn't very evident to me from your response.

On Friday, June 9, 2017 at 3:08:42 PM UTC-7, Carl Mastrangelo wrote:
I don't know if this works today, but we could make it:  IMO the correct way to check this is to call isReady or setOnReadyHandler from the CallStreamObserver.  This is normally for flow control, but connection limiting seems similar enough.

Actually looking at the code that should do the right thing (assuming you are using NettyChannelBuilder) since it checks if the stream has yet been allocated.

On Thursday, June 8, 2017 at 2:35:43 PM UTC-7, R A wrote:

Hello,

 

I am using a custom ServerProvider in which I am limiting the number of concurrent calls per connection:

NettyServerBuilder.forPort(port).maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)….

 

On the client side, grpc seems to wire in StreamBufferingEncoder which starts buffering new streams once maxConcurrentCallsPerConnection limit is reached, with no limit as far as I can see.  My questions are:

1.       Is there a way to inject custom enoder, instead, such that I can limit the number of buffered/pending streams?  Or is there a way to get a handle to StreamBufferingEncoder object such that numBufferedStreams() can be used to monitor the number of buffered stream?

2.       If the above is not possible is there any way to enforce max concurrent streams, such that new streams beyond that limit are rejected and not buffered?

 

Any other solution that may enable me to limit the number of buffered stream will be helpful as well.


Thanks.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.

Eric Anderson

unread,
Jun 21, 2017, 1:59:33 PM6/21/17
to R A, grpc.io
On Tue, Jun 20, 2017 at 6:16 PM, R A <rua...@ebay.com> wrote:

To clarify, I use unary calls.  I have a client that connects to multiple backend servers.  If any of these servers slow down, the requests will start buffering in StreamBufferingEncoder indefinitely (once beyond MAX_CONCURRENT_STREAMS).  I want to be able to identify such a scenario and mark the downstream server unavailable (temporarily) similar to a circuit breaker logic, if greater than X number of streams are buffered for that particular server.  That way a slowness in one server doesn’t affect requests being sent to other servers in the client.


I don't think depending on the buffering is best here; if a server doubled its MAX_CONCURRENT_STREAMS or used MAX_INT, then you would still want to abort RPCs after a point. Instead, it sounds like you should just limit the number of concurrent calls per Channel to something that you find "reasonable". That can be done via an interceptor.

AtomicInteger count = ...;
public blah interceptCall(blah) {
  return new LimitingClientCall(next.newCall(blah));
}

class LimitingClientCall extends ForwardingClientCall {
  public blah start(listener) {
    num = count.incrementAndGet();
    if (num <= LIMIT) {
      // good case; easy case
      super.start(new SimpleForwardingListener(listener) {
        onClose() {
          count.decrementAndGet();
          super.onClose();
        }
      });
    }
    // bad case
    count.decrementAndGet();
    // cancel not strictly necessary since call wasn't started, but a good idea
    super.cancel("Exceeded limit", null);
     // Throw away all future method calls; assumes delegate() returns 'delegate'
    delegate = new NoopClientCall();
    // Doing this last since it can do anything, which may include throwing
    listener.onClose(Status.CANCELLED.withDescription("Exceeded limit"));
  }
}

R A

unread,
Jun 21, 2017, 5:09:02 PM6/21/17
to grpc.io, rua...@ebay.com

Thanks for the suggestion.  To provide more context, I have full control over both the client and the server, and hence the MAX_CONCURRENT_STREAMS setting.  One of the factors in setting MAX_CONCURRENT_STREAMS is to ensure no one client overwhelms the service (to ensure quality of service).  I feel buffering is useful when the downstream service is under duress and is not processing the calls fast enough (because of spikes in traffic etc.), then instead of cancelling new streams on the client, buffer them until the downstream service recovers.  In my case, the downstream services can number in several thousands, so maintaining a buffer and count at the application level would be redundant as StreamBufferingEncoder already does this.  I was able to use AOP to tap into StreamBufferingEncoder and use StreamBufferingEncoder.numBufferedStreams() (along with a reasonable MAX_CONCURRENT_STREAMS value) to help solve this.

Jimit Shah

unread,
Mar 20, 2018, 6:27:06 PM3/20/18
to grpc.io
What is the best symptom on the client side to know that requests are getting delayed because the connection has reached max concurrent streams possible?
Reply all
Reply to author
Forward
0 new messages