Hi all,
I have the following server-streaming method:
> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> responseObserver) {
> log.fine("someone has just emitted an inconsiderated verbal vomit");
> var callMonitor = new Object();
> var echoObserver = (ServerCallStreamObserver<EchoResposne>) responseObserver;
> echoObserver.setOnReadyHandler(() -> {
> log.finer("sink ready");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
> echoObserver.setOnCancelHandler(() -> {
> log.fine("client cancelled the call 1");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
>
> try {
> for (int i = 1; i <= verbalVomit.getReps(); i++) {
> if (echoObserver.isCancelled()) {
> log.fine("client cancelled the call 2");
> return;
> }
> synchronized (callMonitor) {
> while( ! echoObserver.isReady()) {
> log.finer("sink clogged at rep " + i);
> callMonitor.wait();
> }
> }
>
> // multiply the content to fill the buffer faster
> var echoBuilder = new StringBuilder();
> for (int j = 0; j < MULTIPLY_FACTOR; j++) {
> echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
> }
> var echoedVomit =
> EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
>
> if (log.isLoggable(Level.FINEST)) log.finest("echo");
> echoObserver.onNext(echoedVomit);
> }
> echoObserver.onCompleted();
> } catch (StatusRuntimeException e) {
> if (e.getStatus().getCode() == Code.CANCELLED) {
> log.fine("client cancelled the call 3");
> } else {
> log.severe("server error: " + e);
> e.printStackTrace();
> }
> } catch (Exception e) {
> log.severe("server error: " + e);
> e.printStackTrace();
> echoObserver.onError(Status.INTERNAL.withCause(e).asException());
> }
> }
I create the server this way:
> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .build();
and the client looks like this:
> var connector = EchoServiceGrpc.newBlockingStub(channel);
> var request = EchoRequest
> .newBuilder()
> .setInconsideratedVerbalVomit(
> "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
> "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh") // 64B
> .setReps(100)
> .build();
> var vomitIterator = connector.multiEcho(request);
> while (vomitIterator.hasNext()) {
> vomitIterator.next();
> System.out.println("got echo");
> }
and it dead-locks after just a few messages ;-]
server output looks like this:
> started gRPC EchoServer on port 6666
> Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService multiEcho
> FINE: someone has just emitted an inconsiderated verbal vomit
> Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService multiEcho
> FINER: sink clogged at rep 7
...and clients:
> got echo
> got echo
> got echo
> got echo
> got echo
> got echo
...and they both hand indefinitely :(
am I doing something wrong or is it a bug?
The interesting part is that if I use direct executor in the server and dispatch the work to a separate executor, then it works without any problems (that's what I usually do, so I've never encountered this problem before).
ie:
> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> responseObserver) {
> log.fine("someone has just emitted an inconsiderated verbal vomit");
> var callMonitor = new Object();
> var echoObserver = (ServerCallStreamObserver<EchoResposne>) responseObserver;
> echoObserver.setOnReadyHandler(() -> {
> log.finer("sink ready");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
> echoObserver.setOnCancelHandler(() -> {
> log.fine("client cancelled the call 1");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
>
> cpuIntensiveOpExecutor.execute(() -> {
> try {
> for (int i = 1; i <= verbalVomit.getReps(); i++) {
(...)
and
> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .directExecutor()
> .build();
A full working example (dead-locking that is) can be found on github:
https://github.com/morgwai/grpc-deadlock
Any hints will be much appreciated :)
Thanks!