In a server-side streaming scenario, we publish to the "ServerCallStreamObserver" a high number of items to the "onNext()" call. As recommended we need to handle the back pressure and check the readiness of the stream before sending the message. Hence using the method ServerCallStreamObserver's "isReady()". If not ready we will put the thread to sleep until the ServerCallStreamObserver is ready again.
To do so we rely on setOnReadyHandler(Runnable onReadyHandler) which is supposed to be invoked when the ServerCallStreamObserver's isReady() transitions from false to true. But this leads to a thread block situation, while the main thread is waiting to restart again the "onReadyHandler" is never invoke which blocks the publishing to the stream. Later through the help of logs I see that the "onReadyHandler" is waiting to be invoked by the main thread itself. Sample program -
@Override
public void streamNumbers(NumberRequest request, StreamObserver<NumberResponse> responseObserver) {
int count = request.getCount();
this.serverResponseObserver = (ServerCallStreamObserver<NumberResponse>) responseObserver;
this.onReadyHandler = new OnReadyHandler();
serverResponseObserver.setOnReadyHandler(onReadyHandler);
//serverResponseObserver.disableAutoRequest();
System.out.println("Stream numbers start --> " + Thread.currentThread().getName());
for (int i = 0; i <= count; i++) {
NumberResponse response = NumberResponse.newBuilder().setNumber(i).build();
if(!serverResponseObserver.isReady()) {
System.out.println("serverResponseObserver not ready----->" + serverNotReadyCount++ + "current item -->" + i);
this.onReadyHandler.wasReady = false;
while(!onReadyHandler.wasReady) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
serverResponseObserver.onNext(response);
}
serverResponseObserver.onCompleted();
}
class OnReadyHandler implements Runnable {
private boolean wasReady = false;
@Override
public void run() {
System.out.println( " OnReadyHandler inside run->" + serverResponseObserver.isReady() + wasReady + onReadyHandlerRunCount++ + Thread.currentThread().getName());
if (serverResponseObserver.isReady() && !wasReady) {
System.out.println( " Response observer is ready and the Backpressure is released --> ");
wasReady = true;
}
}
}
Appreciate if the community could give some input on the implementation or provide me with idea to handle the backpressure in the correct way.