Delaying next.startCall in ServerInterceptor

759 views
Skip to first unread message

Alexander Chiu

unread,
May 27, 2021, 11:25:37 AM5/27/21
to grpc.io
Hi

I'm trying to build a caching ServerInterceptor which will be the first in a chain of interceptors. I managed to get the basic functionality working by returning a SimpleForwardingServerCallListener which checks a cache in the onMessage method, and manually sending the headers and response and closing the call in case of a cache hit, otherwise delegating to next.startCall(...). However, it seems like next.startCall(...) causes the call to cascade to the subsequent interceptors, and onMessage on the listener is only called after that.

My searches have come across:

and a few others where it seems like it possible to delay next.startCall(...) until after the first message is received. I tried to do something akin to https://github.com/grpc/grpc-java/issues/5414#issuecomment-468754271 using a noop listener, but then the onMessage on the listener never seems to be called at all.

Am I missing something obvious?

PS. A big thank you to all the maintainers

Chengyuan Zhang

unread,
Jun 17, 2021, 9:38:22 PM6/17/21
to grpc.io
The idea in https://github.com/grpc/grpc-java/issues/5414#issuecomment-468754271 should work for your use case and the code should be almost same as that. Can you post a snippet for your code so that we can see what went wrong? The idea is to have your ServerInterceptor produce a ServerCall.Listener that delays calling into the real ServerCall.Listener (or more precisely, avoid calling into the real ServerCall.Listener if the cache-lookup ServerCall.Listener can end the call early with cached data). 

Alexander Chiu

unread,
Jun 18, 2021, 2:44:44 AM6/18/21
to grpc.io
Thanks for getting back to me.

I'm currently doing:

    @Override
    public <ReqTRespTListener<ReqTinterceptCall(ServerCall<ReqTRespTcall,
            Metadata headersServerCallHandler<ReqTRespTnext) {
        if (!cacheableMethods.contains(call.getMethodDescriptor())) {
            return next.startCall(call, headers);
        }
        ServerCall.Listener<ReqTno_op = new ServerCall.Listener<ReqT>() {
        };
        return new ForwardingServerCallListener<ReqT>() {
            ServerCall.Listener<ReqTdelegate = no_op;

            @Override
            protected ServerCall.Listener<ReqTdelegate() {
                return delegate;
            }

            @Override
            public void onMessage(ReqT message) {
                if (delegate == no_op) {
                    Message request = (Message) message;
                    Message resp = cache.getIfPresent(request);
                    if (resp != null) {
                        call.sendHeaders(headers);
                        call.sendMessage((RespT) resp);
                        call.close(Status.OKnew Metadata());
                        return;
                    }
                    delegate = next.startCall(call, headers);
                }
                onMessage(message);
            }
        };
    }

and running a unit test using the InProcessServerBuilder. The test times out, and when I step through with a debugger it doesn't look like onMessage is ever called. If the above offers no clues, I can spend some time setting up a minimal example project I can share.

Chengyuan Zhang

unread,
Jun 18, 2021, 4:58:32 PM6/18/21
to grpc.io
The reason that onMessage is never called is because next.startCall() is delayed so ServerCall's request() is never called. That is, inbound messages will be waiting on flow control for the first message. There is a livelock between ServerCall and ServerCall.Listener: ServerCall.Listener is waiting on the first message to start ServerCall while starting ServerCall requests the first message. So you would need to initiate the request() for the first message:

   @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
        final Metadata headers, final ServerCallHandler<ReqT, RespT> next) {

      final Listener<ReqT> cacheLookUpCallListener = new ForwardingServerCallListener<ReqT>() {
        private final Listener<ReqT> NOOP = new Listener<ReqT>() {};
        private Listener<ReqT> delegate = NOOP;
 
        @Override
        protected Listener<ReqT> delegate() {
          return delegate;
        }
 
        @Override
        public void onMessage(ReqT message) {
          if (delegate == NOOP) {
            // TODO: look up cache, close call and return if cache hit
 
            delegate = next.startCall(call, headers);
          }
          super.onMessage(message);
        }
      };
 
      ServerCallHandler<ReqT, RespT> handler = new ServerCallHandler<ReqT, RespT>() {
        @Override
        public Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
          call.request(1);
          return cacheLookUpCallListener;
        }
      };
 
      return handler.startCall(call, headers);
    }


Note, with this simple solution you are throwing away ServerCall.Listener callbacks before receiving the first message (e.g., onReady()). This is mostly fine for unary calls. But for streaming, it may break flow control (if there is one).

Alexander Chiu

unread,
Jun 21, 2021, 3:19:50 AM6/21/21
to grpc.io
> That is, inbound messages will be waiting on flow control for the first message. There is a livelock between ServerCall and ServerCall.Listener: ServerCall.Listener is waiting on the first message to start ServerCall while starting ServerCall requests the first message. So you would need to initiate the request() for the first message:

Thank you for this explanation. This was exactly what I was missing.

I'm happy to report that I've updated my code based on the advice given, and everything is working as expected.

Thank you so much for all your help!
Reply all
Reply to author
Forward
0 new messages