Help with gRpc Error (call is closed)

1,008 views
Skip to first unread message

grpc-noob

unread,
Jan 28, 2019, 2:14:22 AM1/28/19
to grpc.io
Hi folks,

I am pretty noob with gRpc and trying to make bidirectional streaming work with ListenableFutures.

My code is as follows:

        RequestService requestService = MoreExecutors.listeningDecorator(MyExecutor) ;        
       
ResponseService responseService = MoreExecutors.listeningDecorator(MyExecutor) ;

       
@Override
       
public StreamObserver<Request> ingestEventsStreaming(StreamObserver<Response> responseObserver) {
           
return new StreamObserver<Request>() {
               
@Override
               
public void onNext(Request request) {
                   
ListenableFuture<IngestionResponse> listenableFuture = requestService.submit( new RequestHandlerTask(config));

                    listenableFuture
.addListener(new Runnable() {
                   
@Override
                   
public void run() {
                   
try {
                        responseObserver
.onNext(listenableFuture.get());
                   
} catch (InterruptedException e) {
                      LOG
.error("Interrupted", e);
                   
} catch (ExecutionException e) {
                      LOG
.error("Exception in task", e.getCause());
                   
}
                 
}  }, responseService);
           
}

               
@Override
               
public void onError(Throwable t) {
                    LOG
.warn(“Request Failed");
                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }



Above code throws following exception:


Exception in thread "pool-3-thread-1" Exception in thread "pool-3-thread-3" java.lang.IllegalStateException: call is closed

at com.google.common.base.Preconditions.checkState(Preconditions.java:510)

at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:89)

at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:338)

at com.paloaltonetworks.apollo2.ingestion.frontend.server.FrontendServer$FrontendService$1$1.run(FrontendServer.java:160)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)



It seems that adding listener fails the code. 


May I know if I am doing anything wrong ? Any help will be much appreciated.


Thanks,



George Gensure

unread,
Jan 28, 2019, 10:13:46 AM1/28/19
to grpc-noob, grpc.io
Your client has closed the connection, which was received by the grpc server layer and closed the ServerCall it maintains. For (presumably) long lived connections, you should establish a cancellation listener on your current Context from within the initial method which cancels any incomplete futures, wrap any asynchronously executed scopes with that context::wrap with Context.current().isCancelled() tests within them to avoid communicating with the responseObserver if cancelled. On another async style note, your future.get() with Interrupted/Execution handling there would be better suited for a Futures.transform on your responseService that invokes responseObserver.onNext with the result.

Cheers!
-George

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/1ae15b1d-c7fc-46d8-affa-c13d10fa0af3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

grpc-noob

unread,
Jan 30, 2019, 3:44:24 AM1/30/19
to grpc.io
thanks George
Reply all
Reply to author
Forward
0 new messages