[java] - gRPC ThreadPoolExecutor and bounded queue

56 views
Skip to first unread message

in...@olivierboucher.com

unread,
Nov 29, 2018, 11:43:04 AM11/29/18
to grpc.io

Hi everyone,

Our gRPC server runs on a ThreadPoolExecutor with a corePoolSize of 4 and a maximumPoolSize of 16. In order to have the pool size increase, we provide a BlockingQueue with a bounded size of 20. 

Sometimes short bursts happen and we're perfectly fine with dropping requests at this moment, we provided a custom RejectionExecutionHandler that increases a counter we are monitoring. However, this rejection handler is not aware of the request itself, it only sees a Runnable.

My question is: are the requests automatically canceled if they could not get queued? Do I need to cancel them manually somehow?

Thanks

Carl Mastrangelo

unread,
Nov 29, 2018, 1:57:36 PM11/29/18
to grpc.io
You *really* don't want to limit the queue size.  The queue is not per RPC, but per RPC callback event.   If the enqueue'd callback (like headers received, data received, cancellation, etc.) gets dropped, the RPC will be in a zombie state and never able to finish or die.  Additionally, if you block on attempting to add callbacks (instead of just failing them), you run the risk of deadlocking, because the net thread will be blocked on the application thread.    

The BlockingQueue in the executor is not a good fit for async callbacks.   It would be much better to install an Interceptor that keeps track of the number of active calls, and simply fails RPCs (instead of callbacks) if the number gets too high.   

in...@olivierboucher.com

unread,
Nov 29, 2018, 3:56:00 PM11/29/18
to grpc.io
Thank you Carl.

Is something like this what you meant? I striped out the resizing logic... activeCalls is an AtomicInteger

@Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        resizePool(activeCalls.incrementAndGet());
        ServerCall.Listener<ReqT> delegate = next.startCall(call, headers);
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
            @Override
            public void onCancel() {
                super.onCancel();
                activeCalls.decrementAndGet();
            }

            @Override
            public void onComplete() {
                super.onComplete();
                activeCalls.decrementAndGet();
            }
        };
    }


Carl Mastrangelo

unread,
Nov 29, 2018, 4:07:33 PM11/29/18
to grpc.io
I would modify the counters before calling super (in case they throw an exception).  Also, I (personally), would fail the RPCs early rather than doing resize of the pool.   You can invoke call.close(Status.CANCELED), and then return a noop listener, instead of invoked methods on next.
Reply all
Reply to author
Forward
0 new messages