[Java] Server-side streaming - server doesn't detect when client disappears

1,520 views
Skip to first unread message

Christopher Schechter

unread,
Apr 12, 2018, 5:07:54 PM4/12/18
to grpc.io
Hi all,

I'm working on setting up a server-side stream GRPC. When the stream is started, the server should stream messages to the client until either the server or the client shuts down.

When the server shuts down, this is easy - it calls StreamObserver.onCompleted() and the client is then notified that the stream is ending.

When clients shut down, they can do a graceful shutdown with ManagedChannel.shutdownNow() to stop the stream from their end. However, when the client shuts down ungracefully, the server never notices, and can continue to call StreamObserver.onNext() basically forever.

My question is, is there a way to detect this situation from the server side when the client shuts down ungracefully? I would expect an exception to be thrown at some point, but that never happens. Can I manually check something to see whether the connection is broken?

I've seen one other mention of a similar/same issue in this topic, which is from a long time ago but I don't see a resolution to it.

Thanks,
Chris

Carl Mastrangelo

unread,
Apr 12, 2018, 5:35:19 PM4/12/18
to grpc.io
This can be eventually detected by setting keepAliveTime(), keepAliveTimeout(), maxConnectionIdle(), maxConnectionAge(), and so forth on your ServerBuilder.    

In general, it's not possible to quickly detect if the remote side has silently stopped, so the best we can do is actively check, and set timeouts. 

Christopher Schechter

unread,
Apr 13, 2018, 6:32:14 PM4/13/18
to grpc.io
Hm, I would have thought that seeing the TCP connection broken would at some point cause StreamObserver.onNext() (on the server side) to complain.

I've set all four options you mentioned on my Netty server. When the gRPC starts, the server enters a loop where it calls StreamObserver.onNext() basically forever until the server shuts down, at which point it calls onCompleted().

When the connection from client to server is made, I can see keepalive pings via the grpc debug logs. Once the time surpasses the maxConnectionAge + grace period, the client disconnects. I'm assuming this is an "ungraceful" disconnect, since it only disconnects once the grace period is over (as the stream is still producing more messages).

However, the server continues to execute its loop calling onNext() ad infinitum. It seems to completely ignore that the client no longer accepts messages, nor does it respond to the keepalive pings. So now it looks like my problem is that the server never recognizes that the client has disappeared.

Here's my server configuration. The time values aren't meant to be realistic, I just wanted to see the behavior in a manual test:

Server server = NettyServerBuilder.forPort(port)
            .addService(testService)
            .executor(Executors.newFixedThreadPool(5))
            .keepAliveTime(10, TimeUnit.SECONDS)
            .keepAliveTimeout(10, TimeUnit.SECONDS)
            .maxConnectionAge(30, TimeUnit.SECONDS)
            .maxConnectionAgeGrace(5, TimeUnit.SECONDS)
            .maxConnectionIdle(5, TimeUnit.SECONDS)
            .build();

Inside testService, it has this server-side streaming method:

// for the purpose of this test, this queue is pre-filled with a handful of StreamItems, and never has any other objects added to it   
BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>();

    @Override
    public void testStream(StreamRequest request, StreamObserver<StreamItem> responseObserver) {
        try {
            while (!Thread.interrupted()) {
                StreamItem next = queue.poll();
                if (next == null) {
                    LOGGER.debug("Queue is empty, waiting before next poll()");
                    Thread.sleep(1000);
                    // send default instance which is empty, to signal that the queue is empty right now
                    responseObserver.onNext(StreamItem.getDefaultInstance());
                } else {
                    responseObserver.onNext(next);
                }
            }
        } catch (InterruptedException e) {
            LOGGER.info("Interrupted. Exiting loop.");
        } catch (Exception e) {
            LOGGER.error("Unexpected error", e);
            responseObserver.onError(e);
            return;
        }
        responseObserver.onCompleted();
    }

And here's my test client code:

ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
        .usePlaintext(true)
        .build();

TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);

try {
    Iterator<StreamItem> stream = blockingStub.testStream(subscriptionRequest);
    while (stream.hasNext()) {
        LOGGER.debug("Received an item:\n{}", stream.next());
    }
} catch (StatusRuntimeException e) {
    LOGGER.error("GRPC Exception: {}", e.getStatus());
}

Is there something wrong here? I feel like I'm missing something.

Christopher Schechter

unread,
Apr 16, 2018, 8:15:00 PM4/16/18
to grpc.io
I suppose my previous post was overly verbose, so I'll pose a more succinct question instead:

In a gRPC server stream, what is the expected behavior on the server side when the client disconnects (either gracefully or due to some unknown reason) ? Assuming keepalive is set on both client & server side, the server should at some point recognize the connection is expired. Will gRPC allow the server to keep streaming messages with onNext() until onCompleted() is invoked, or should onNext() at some point complain?

Eric Anderson

unread,
Apr 16, 2018, 8:27:59 PM4/16/18
to Christopher Schechter, grpc.io
On Fri, Apr 13, 2018 at 3:32 PM, Christopher Schechter <chris...@gmail.com> wrote:
Inside testService, it has this server-side streaming method:

// for the purpose of this test, this queue is pre-filled with a handful of StreamItems, and never has any other objects added to it   
BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>();

    @Override
    public void testStream(StreamRequest request, StreamObserver<StreamItem> responseObserver) {
        try {
            while (!Thread.interrupted()) {
                StreamItem next = queue.poll();
                if (next == null) {
                    LOGGER.debug("Queue is empty, waiting before next poll()");
                    Thread.sleep(1000);
                    // send default instance which is empty, to signal that the queue is empty right now
                    responseObserver.onNext(StreamItem.getDefaultInstance());
                } else {
                    responseObserver.onNext(next);
                }
            }
        } catch (InterruptedException e) {
            LOGGER.info("Interrupted. Exiting loop.");
        } catch (Exception e) {
            LOGGER.error("Unexpected error", e);
            responseObserver.onError(e);
            return;
        }
        responseObserver.onCompleted();
    }

Because this method doesn't return, it blocks the callback thread which delivers notifications. We have checks to throw if the client cancelled, but they aren't triggering. I just filed an issue for this.

For quick fix, you can cast responseObserver to ServerCallStreamObserver and check isCancelled() as part of the loop. Alternatively, you can rely on the gRPC Context by calling Context.current().isCancelled().

Eric Anderson

unread,
Apr 16, 2018, 8:31:28 PM4/16/18
to Christopher Schechter, grpc.io
Oh, and if you want a notification for your code, you could use Context.current().addListener(). That should be firing, as it isn't synchronized with the rest of the callbacks.

Christopher Schechter

unread,
Apr 17, 2018, 2:31:39 PM4/17/18
to grpc.io
Checking whether the RPC is cancelled in the loop works great. Thanks for the informative response Eric!

I think another part of the confusion for me was that I wasn't expecting the Server's executor thread pool to be in charge of both handling and cancelling the RPC's. In my test, I had a fixed thread pool with 5 threads, and sometimes would start up 5 streams to clients. Then, if a client disconnected, the server wouldn't cancel it because it would have no executor threads available to do so.

Eric Anderson

unread,
Apr 19, 2018, 4:22:57 PM4/19/18
to Christopher Schechter, grpc.io
On Tue, Apr 17, 2018 at 11:31 AM, Christopher Schechter <chris...@gmail.com> wrote:
I think another part of the confusion for me was that I wasn't expecting the Server's executor thread pool to be in charge of both handling and cancelling the RPC's. In my test, I had a fixed thread pool with 5 threads, and sometimes would start up 5 streams to clients. Then, if a client disconnected, the server wouldn't cancel it because it would have no executor threads available to do so.

So to be clear, the RPC had been cancelled and the server-side library was aware of the cancellation. It was just an API thing in getting the notification to your service.

Ravi Teja

unread,
Jan 16, 2023, 2:12:15 AM1/16/23
to grpc.io

I am also facing the issue. server-side cancellation is happening. 
serverCallStreamObserver.setOnCancelHandler(()-> {
System.out.println("cancel handler called and event triggered for calling id :: {}", request.getCallID());
});

this will happen when Client closes the call. But Here Server side happening. How it getting close? check with different methods. could not figure it out.


Thanks,
Ravi.


Reply all
Reply to author
Forward
0 new messages