Cache response in ServerInterceptor

256 views
Skip to first unread message

jinxiang lang

unread,
Jul 21, 2022, 9:49:03 AM7/21/22
to grpc.io
Hi

I am trying to build an ldempotentInterceptors, I want to save a token in the header, save the token and response in the first gRPC call, and query the response through the token in the second call, and return directly.


My searches have come across:

https://gist.githubusercontent.com/marcellanz/b10b77a8cf4c71a30904afae8aa33960/raw/de94f9176d856508c0ffcf3a9985e14015da9704/useInputStreamMessages.java

https://github.com/grpc/grpc-java/issues/1712

https://engineering.kabu.com/entry/2021/03/31/162401

https://groups.google.com/g/grpc-io/c/_osH2D6L9Ck/m/FdvqQgkRDgAJ


I don't know what to do, hope can get help

Am I missing something obvious?

PS. A big thank you to all the maintainers

public class IdempotentInterceptors implements ServerInterceptor {

    private static final Metadata.Key<String> CLIENT_HEADER_KEY = Metadata.Key.of("token", Metadata.ASCII_STRING_MARSHALLER);

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

        String token = headers.get(CLIENT_HEADER_KEY);
        MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
        String serviceName = methodDescriptor.getServiceName();
        String bareMethodName = methodDescriptor.getBareMethodName();
        log.info("1、header received from client token: {}, serviceName: {}, bareMethodName: {}", token, serviceName, bareMethodName);
        ServerCall<ReqT, RespT> wrapperCall =
                new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {


                    @Override
                    public void sendMessage(RespT response)
                        save(token, response);
                        super.sendMessage(response);
                    }

                };

        ServerCall.Listener<ReqT> listener = next.startCall(wrapperCall, headers);
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {



            @Override
            public void onMessage(ReqT request) {
                RespT response = null;
                if (token != null) {
                    response = (RespT) select(token);
                }
                if (response != null) {
                    //


                } else {
                    super.onMessage(request);
                }
            }

        };
    }

}



Eric Anderson

unread,
Jul 22, 2022, 11:22:23 AM7/22/22
to jinxiang lang, grpc.io
If it is just based on the metadata token (which is suspicious), then just respond in-line:

String token = headers.get(CLIENT_HEADER_KEY);
RespT resp = (RespT) select(token);
if (resp != null) {
  call.sendHeaders(new Metadata());
  call.sendMessage(resp);
  call.close(Status.OK, new Metadata());
  return;
}
// proceed with next.startCall() to save responses into the cache.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/878a29fd-71c3-4356-a69a-3a87d6864be6n%40googlegroups.com.

jinxiang lang

unread,
Jul 22, 2022, 10:03:15 PM7/22/22
to grpc.io
Thank you very much for your reply, but there is still a little problem。


public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String requestId = headers.get(CLIENT_HEADER_KEY);

MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
String serviceName = methodDescriptor.getServiceName();
String bareMethodName = methodDescriptor.getBareMethodName();
log.info("1、header received from client requestId: {}, serviceName: {}, bareMethodName: {}", requestId, serviceName, bareMethodName);
// return response
RespT response = (RespT) select(requestId);
if (response != null) {
call.sendHeaders(new Metadata());
call.sendMessage(response);
call.close(Status.OK,new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
//save request and response

ServerCall<ReqT, RespT> wrapperCall =
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT response) {
if (requestId != null) {
save(requestId, response);
}
super.sendMessage(response);
}
};
return next.startCall(wrapperCall,headers);
}

I think it should work fine, but on the second request I get an exception。

java.lang.IllegalStateException: java.lang.IllegalStateException: call already closed
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall.runInternal(ServerImpl.java:619) ~[grpc-core-1.42.2.jar:1.42.2]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall.runInContext(ServerImpl.java:603) ~[grpc-core-1.42.2.jar:1.42.2]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.42.2.jar:1.42.2]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.42.2.jar:1.42.2]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
Caused by: java.lang.IllegalStateException: call already closed

I  looking forward to your reply

jinxiang lang

unread,
Jul 23, 2022, 6:18:45 AM7/23/22
to grpc.io
I have a new discovery, which may occur in serialization and deserialization, and an exception occurs, causing the problem to report an error ”call already closed“

Eric Anderson

unread,
Jul 25, 2022, 6:10:39 PM7/25/22
to jinxiang lang, grpc.io
That interceptor looks fine. Note that the issue could be in a different interceptor.

On Fri, Jul 22, 2022 at 7:03 PM jinxiang lang <juejin...@gmail.com> wrote:
java.lang.IllegalStateException: java.lang.IllegalStateException: call already closed
    ...

Caused by: java.lang.IllegalStateException: call already closed

The second stack trace here that you didn't include is the stack I'd need to see. I expect it is the interceptor line "call.close(Status.OK,new Metadata());". If it is anything other than that, then it'd be helpful to see.

I have a new discovery, which may occur in serialization and deserialization, and an exception occurs, causing the problem to report an error ”call already closed“

Yeah, I see a flow that could cause that if the message couldn't be serialized. What error does the client see? 

jinxiang lang

unread,
Jul 26, 2022, 3:41:16 AM7/26/22
to grpc.io
Yeah, of course ,The client's response is like this:

{
  "error": "2 UNKNOWN: "
}


complete stack trace information :

2022-07-26 15:31:51.470 ERROR - io.grpc.internal.SerializingExecutor.run[SerializingExecutor.java:136]-- Exception while executing runnable io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall@6d61cdda

java.lang.IllegalStateException: java.lang.IllegalStateException: call already closed
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall.runInternal(ServerImpl.java:619) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall.runInContext(ServerImpl.java:603) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.48.0.jar:1.48.0]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
Caused by: java.lang.IllegalStateException: call already closed
    at com.google.common.base.Preconditions.checkState(Preconditions.java:507) ~[guava-26.0-jre.jar:?]
    at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:218) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:211) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39) ~[grpc-api-1.48.0.jar:1.48.0]
    at net.devh.boot.grpc.server.metric.MetricCollectingServerCall.close(MetricCollectingServerCall.java:60) ~[grpc-server-spring-boot-autoconfigure-2.13.1.RELEASE.jar:2.13.1.RELEASE]
    at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39) ~[grpc-api-1.48.0.jar:1.48.0]
    at com.alibaba.csp.sentinel.adapter.grpc.SentinelGrpcServerInterceptor$1.close(SentinelGrpcServerInterceptor.java:77) ~[sentinel-grpc-adapter-1.7.1.jar:?]
    at com.opay.ccp.proxy.interceptor.IdempotentInterceptors.interceptCall(IdempotentInterceptors.java:71) ~[classes/:?]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.48.0.jar:1.48.0]
    at com.alibaba.csp.sentinel.adapter.grpc.SentinelGrpcServerInterceptor.interceptCall(SentinelGrpcServerInterceptor.java:64) ~[sentinel-grpc-adapter-1.7.1.jar:?]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.48.0.jar:1.48.0]
    at net.devh.boot.grpc.server.metric.MetricCollectingServerInterceptor.interceptCall(MetricCollectingServerInterceptor.java:144) ~[grpc-server-spring-boot-autoconfigure-2.13.1.RELEASE.jar:2.13.1.RELEASE]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.Contexts.interceptCall(Contexts.java:52) ~[grpc-api-1.48.0.jar:1.48.0]
    at net.devh.boot.grpc.server.scope.GrpcRequestScope.interceptCall(GrpcRequestScope.java:75) ~[grpc-server-spring-boot-autoconfigure-2.13.1.RELEASE.jar:2.13.1.RELEASE]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.48.0.jar:1.48.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.startWrappedCall(ServerImpl.java:703) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.access$2200(ServerImpl.java:406) ~[grpc-core-1.48.0.jar:1.48.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1HandleServerCall.runInternal(ServerImpl.java:615) ~[grpc-core-1.48.0.jar:1.48.0]
    ... 6 more

this is me DAO method:

private <RespT> void save(String requestId, RespT response) {
log.info("save Database,requestId: {},response: {}", requestId, response);
jdbcTemplate.update(saveStr, new Object[]{requestId, response});
}

public byte[] select(String requestId) {
log.info("select Database,requestId为: {}", requestId);
List<IdempotentRequest> query = jdbcTemplate.query(selectStr, new Object[]{token}, new BeanPropertyRowMapper<IdempotentRequest>(IdempotentRequest.class));
log.info(String.valueOf(query.size()));
if (query != null && query.size() != 0) {
return query.get(0).getResponse();
}
return null;
}


database sql :

CREATE TABLE `idempotent_request` (

  `id` bigint(19) NOT NULL AUTO_INCREMENT COMMENT '主键',

  `prj_name` varchar(64) DEFAULT NULL COMMENT '项目名称',

  `interface_name` varchar(64) DEFAULT NULL COMMENT '服务名称',

  `request_id` varchar(256) DEFAULT NULL COMMENT '请求id',

  `response` blob COMMENT '响应',

  `biz_column_values` varchar(512) DEFAULT NULL COMMENT '冗余业务字段',

  `sign` varchar(64) DEFAULT NULL COMMENT '摘要签名',

  `status` tinyint(1) DEFAULT NULL COMMENT '状态',

  `valid_end_time` datetime DEFAULT NULL COMMENT '有效期',

  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',

  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=62 DEFAULT CHARSET=utf8mb4;



When I was debugging, I found that it should be caused by the inability to serialize the response, but I don't know how to improve it. I look forward to your reply.

Eric Anderson

unread,
Aug 1, 2022, 12:44:35 PM8/1/22
to jinxiang lang, grpc.io
    at com.alibaba.csp.sentinel.adapter.grpc.SentinelGrpcServerInterceptor$1.close(SentinelGrpcServerInterceptor.java:77) ~[sentinel-grpc-adapter-1.7.1.jar:?]
    at com.opay.ccp.proxy.interceptor.IdempotentInterceptors.interceptCall(IdempotentInterceptors.java:71) ~[classes/:?]

That seems likely to be the "call.close(Status.OK,new Metadata());" line. Since the interceptors are running in interceptCall, that means gRPC hasn't seen any exception at this point yet.

Based on the stacktrace, I suggest you to look through SentinelGrpcServerInterceptor, MetricCollectingServerInterceptor, and GrpcRequestScope for bugs that would call close() twice.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages