Detecting client disconnections from server streaming (grpc-java)

49 views
Skip to first unread message

Greg Ludington

unread,
Aug 19, 2020, 1:38:14 PM8/19/20
to grpc.io
In server-side streaming, how does an ungraceful disconnection by a client get communicated back to the server?

If, for example, I have an application-level ping rpc, an then I unplug a network cable, after a few unsuccessful pings, the ServerCallStreamObserver will report the error.  Without those pings, I am not receiving a notification.  Using a NettyServerBuilder with 10s keepAlive and 500ms keepAliveTimeout, and code similar to this:

   Context oldCurrent = Context.current();
   Context previous = oldCurrent.attach();
   Context.current().addListener(context -> log.error("Cancel received!"), executorService);
   try {
       while (!shutdown && !Thread.interrupted()) {
          next = queue.poll();
          if (next == null) {
             log.debug("Queue is empty, waiting before next poll()");
             Thread.sleep(1000);
             // if sending an empty message here, the cancellation will be picked up quickly
            //serverCallStreamObserver.onNext(PING);
          } else {
             serverCallStreamObserver.onNext(next);
          }
       }   
} (catch and finally clauses snipped)

listeners attached to any of the above contexts never fire; what might I be missing in order to notify the application of a broken connection?

Eric Anderson

unread,
Aug 21, 2020, 7:23:22 PM8/21/20
to Greg Ludington, grpc.io
On Wed, Aug 19, 2020 at 10:38 AM Greg Ludington <gludi...@gmail.com> wrote:
In server-side streaming, how does an ungraceful disconnection by a client get communicated back to the server?

It'll be a cancellation. "All bad things" become cancellation on server-side.

If, for example, I have an application-level ping rpc, an then I unplug a network cable, after a few unsuccessful pings, the ServerCallStreamObserver will report the error.  Without those pings, I am not receiving a notification.

Yes, with TCP a write() is required to detect a broken connection.

Using a NettyServerBuilder with 10s keepAlive and 500ms keepAliveTimeout, and code similar to this:

Huh. That should work. I do suggest having a higher timeout though, because at times TCP really does need that long to recover. (The 20s default is actually quite fair; try at least to keep it above a second.)

   Context oldCurrent = Context.current();
   Context previous = oldCurrent.attach();

These lines don't really do anything. But you also don't need to do anything here.

   Context.current().addListener(context -> log.error("Cancel received!"), executorService);
   try {
       while (!shutdown && !Thread.interrupted()) {
          next = queue.poll();

Since you're using a queue here and are checking for interrupted it seems like this is a worker thread of your own in some way. Each RPC has its own Context which is initially only set on callbacks within the StreamObserver. If you then process work on another thread you need to propagate that Context in some way, as Context is stored in a ThreadLocal. That could be as easy as making sure to wrap your Executors with Context.currentContextExecutor() when creating them. But at other times could be more difficult. Context provides a few convenience utilities.

Propagating the context has advantages independent of anything else you do, but you could also use ServerCallStreamObserver.setOnCancelHandler() which provides a callback similar to the other callbacks on StreamObserver. To get a ServerCallStreamObserver just cast the StreamObserver passed to your service; it is guaranteed to be a ServerCallStreamObserver. If the onCancelHandler() is called (or if Context is cancelled on the original thread and you have trouble propagating it), it seems like you could interrupt that polling thread to notify it should stop.

The main difference between Context cancellation and setOnCancelHandler is that Context requires the listener to be thread-safe whereas setOnCancelHandler runs serially with the other callbacks. setOnCancelHandler also disables onNext throwing in case of cancellation (setOnCancelHandler's javadoc).
Reply all
Reply to author
Forward
0 new messages