Grpc - Java server side streaming handling backpressure

463 views
Skip to first unread message

Vivek Shindhe

unread,
Nov 16, 2023, 5:44:03 PM11/16/23
to grpc.io
In a server-side streaming scenario, we publish to the "ServerCallStreamObserver" a high number of items to the "onNext()" call. As recommended we need to handle the back pressure and check the readiness of the stream before sending the message. Hence using the method ServerCallStreamObserver's "isReady()". If not ready we will put the thread to sleep until the ServerCallStreamObserver is ready again. 

To do so we rely on setOnReadyHandler(Runnable onReadyHandler) which is supposed to be invoked when the  ServerCallStreamObserver's isReady() transitions from false to true. But this leads to a thread block situation, while the main thread is waiting to restart again the "onReadyHandler" is never invoke which blocks the publishing to the stream. Later through the help of logs I see that the "onReadyHandler" is waiting to be invoked by the main thread itself. Sample program -

@Override
    public void streamNumbers(NumberRequest request, StreamObserver<NumberResponse> responseObserver) {
        int count = request.getCount();
        this.serverResponseObserver = (ServerCallStreamObserver<NumberResponse>) responseObserver;
        this.onReadyHandler = new OnReadyHandler();
        serverResponseObserver.setOnReadyHandler(onReadyHandler);
        //serverResponseObserver.disableAutoRequest();
        System.out.println("Stream numbers start --> " + Thread.currentThread().getName());
        for (int i = 0; i <= count; i++) {
            NumberResponse response = NumberResponse.newBuilder().setNumber(i).build();
            if(!serverResponseObserver.isReady()) {
                System.out.println("serverResponseObserver not ready----->" + serverNotReadyCount++ + "current item -->" + i);
                this.onReadyHandler.wasReady = false;
                while(!onReadyHandler.wasReady) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            serverResponseObserver.onNext(response);
        }
        serverResponseObserver.onCompleted();
    }

    class OnReadyHandler implements Runnable {
        private boolean wasReady = false;

        @Override
        public void run() {
            System.out.println( " OnReadyHandler inside run->" + serverResponseObserver.isReady() + wasReady + onReadyHandlerRunCount++ + Thread.currentThread().getName());
            if (serverResponseObserver.isReady() && !wasReady) {
                System.out.println( " Response observer is ready and the Backpressure is released --> ");
                wasReady = true;
            }
        }
    }

Appreciate if the community could give some input on the implementation or provide me with idea to handle the backpressure in the correct way.
Message has been deleted

Eric Anderson

unread,
Nov 28, 2023, 10:47:30 AM11/28/23
to Vivek Shindhe, grpc.io
On Thu, Nov 16, 2023 at 2:44 PM Vivek Shindhe <shindh...@gmail.com> wrote:
But this leads to a thread block situation, while the main thread is waiting to restart again the "onReadyHandler" is never invoke which blocks the publishing to the stream.

Seems you already have an understanding of the situation. Yes, the callbacks are not considered thread-safe, so the onReady() can't be called until the previous callback returns. Remove the polling sleep and return is the encouraged solution. You could also keep the sleep and do polling on isReady(), at the cost of latency.

Ivy Zhuang

unread,
Nov 29, 2023, 1:09:38 PM11/29/23
to grpc.io
Yes the server method callback and onReady() are both handled by the same executor. So you can not block on the method while waiting for the onReady handler to be executed. Try something like this on your callback:

AtomicInteger sentMessage = new AtomicInteger(0);
    final AtomicBoolean isClosed = new AtomicBoolean(false);
    Runnable onReady = () -> {
      while (((ServerCallStreamObserver<StreamingOutputCallResponse>) responseObserver).isReady() && sentMessage.get() < 100) {
        responseObserver.onNext(StreamingOutputCallResponse.newBuilder().build());
        sentMessage.incrementAndGet();
      }
      if (sentMessage.get() >= 100 && !isClosed.get()) {
        responseObserver.onCompleted();
        isClosed.set(true);
      }
    };
    ((ServerCallStreamObserver)responseObserver).setOnReadyHandler(onReady);
    onReady.run();
Reply all
Reply to author
Forward
0 new messages