java: deadlock in a server-streaming RPC

18 views
Skip to first unread message

Piotr Morgwai Kotarbinski

unread,
Jun 5, 2021, 10:49:51 AMJun 5
to grpc.io, Eric Anderson
Hi all,
I have the following server-streaming method:

> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> responseObserver) {
> log.fine("someone has just emitted an inconsiderated verbal vomit");
> var callMonitor = new Object();
> var echoObserver = (ServerCallStreamObserver<EchoResposne>) responseObserver;
> echoObserver.setOnReadyHandler(() -> {
> log.finer("sink ready");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
> echoObserver.setOnCancelHandler(() -> {
> log.fine("client cancelled the call 1");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
>
> try {
> for (int i = 1; i <= verbalVomit.getReps(); i++) {
> if (echoObserver.isCancelled()) {
> log.fine("client cancelled the call 2");
> return;
> }
> synchronized (callMonitor) {
> while( ! echoObserver.isReady()) {
> log.finer("sink clogged at rep " + i);
> callMonitor.wait();
> }
> }
>
> // multiply the content to fill the buffer faster
> var echoBuilder = new StringBuilder();
> for (int j = 0; j < MULTIPLY_FACTOR; j++) {
> echoBuilder.append(verbalVomit.getInconsideratedVerbalVomit());
> }
> var echoedVomit =
> EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();
>
> if (log.isLoggable(Level.FINEST)) log.finest("echo");
> echoObserver.onNext(echoedVomit);
> }
> echoObserver.onCompleted();
> } catch (StatusRuntimeException e) {
> if (e.getStatus().getCode() == Code.CANCELLED) {
> log.fine("client cancelled the call 3");
> } else {
> log.severe("server error: " + e);
> e.printStackTrace();
> }
> } catch (Exception e) {
> log.severe("server error: " + e);
> e.printStackTrace();
> echoObserver.onError(Status.INTERNAL.withCause(e).asException());
> }
> }

I create the server this way:

> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .build();

and the client looks like this:

> var connector = EchoServiceGrpc.newBlockingStub(channel);
> var request = EchoRequest
> .newBuilder()
> .setInconsideratedVerbalVomit(
> "bleeeeeeeeeeeeeeeeeeeehhhhhhhhhh" +
> "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh") // 64B
> .setReps(100)
> .build();
> var vomitIterator = connector.multiEcho(request);
> while (vomitIterator.hasNext()) {
> vomitIterator.next();
> System.out.println("got echo");
> }

and it dead-locks after just a few messages ;-]

server output looks like this:

> started gRPC EchoServer on port 6666
> Jun 05, 2021 8:22:52 P.M. pl.morgwai.samples.grpc.deadlock.EchoService multiEcho
> FINE: someone has just emitted an inconsiderated verbal vomit
> Jun 05, 2021 8:22:53 P.M. pl.morgwai.samples.grpc.deadlock.EchoService multiEcho
> FINER: sink clogged at rep 7

...and clients:

> got echo
> got echo
> got echo
> got echo
> got echo
> got echo

...and they both hand indefinitely :(

am I doing something wrong or is it a bug?

The interesting part is that if I use direct executor in the server and dispatch the work to a separate executor, then it works without any problems (that's what I usually do, so I've never encountered this problem before).
ie:

> public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> responseObserver) {
> log.fine("someone has just emitted an inconsiderated verbal vomit");
> var callMonitor = new Object();
> var echoObserver = (ServerCallStreamObserver<EchoResposne>) responseObserver;
> echoObserver.setOnReadyHandler(() -> {
> log.finer("sink ready");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
> echoObserver.setOnCancelHandler(() -> {
> log.fine("client cancelled the call 1");
> synchronized (callMonitor) { callMonitor.notifyAll(); }
> });
>
> cpuIntensiveOpExecutor.execute(() -> {
> try {
> for (int i = 1; i <= verbalVomit.getReps(); i++) {
(...)

and

> echoServer = NettyServerBuilder
> .forPort(port)
> .maxConnectionAge(10, TimeUnit.MINUTES)
> .maxConnectionAgeGrace(12, TimeUnit.HOURS)
> .addService(new EchoService())
> .directExecutor()
> .build();

A full working example (dead-locking that is) can be found on github: https://github.com/morgwai/grpc-deadlock

Any hints will be much appreciated :)

Thanks!

Piotr Morgwai Kotarbinski

unread,
Jun 5, 2021, 10:53:42 AMJun 5
to grpc.io
sorry for the formatting, let me try again:

Piotr Morgwai Kotarbinski

unread,
Jun 5, 2021, 11:01:32 AMJun 5
to grpc.io, Eric Anderson
I was trying this with grpc-1.38.0 on openjdk-11 on ubuntu-18.04 in case it matters.

Piotr Morgwai Kotarbinski

unread,
Jun 5, 2021, 8:06:08 PMJun 5
to grpc.io, Eric Anderson
ok, solved it:
the deadlock was caused by the fact that a Listener is guaranteed to be called by at most 1 thread concurrently. Therefore, after I blocked a thread in line 50 (https://github.com/morgwai/grpc-deadlock/blob/master/src/main/java/pl/morgwai/samples/grpc/deadlock/EchoService.java#L50), it was still holding Listener's lock (as user method is called by Listener.onHalfClose() in case of unary clients), so no other thread could possibly call Listener.onReady() which calls onReadyHandler which would notify the first blocked thread.

I've refactored the code to never block the thread by doing the actual work in onReadyHandler, so now it works fine:

public void multiEcho(EchoRequest verbalVomit, StreamObserver<EchoResposne> responseObserver) {
log.fine("someone has just emitted an inconsiderated verbal vomit");
int[] repsRemainingHolder = { verbalVomit.getReps() };
var echoObserver = (ServerCallStreamObserver<EchoResposne>) responseObserver;
echoObserver.setOnReadyHandler(() -> {
log.finer("sink ready");
try {
while (
repsRemainingHolder[0] > 0
&& echoObserver.isReady()
&& ! echoObserver.isCancelled()
) {
// multiply the content to fill the buffer faster
var echoBuilder = new StringBuilder();
for (int j = 0; j < MULTIPLY_FACTOR; j++) echoBuilder.append(verbalVomit);
var echoedVomit =
EchoResposne.newBuilder().setEchoedVomit(echoBuilder.toString()).build();

if (log.isLoggable(Level.FINEST)) log.finest("echo");
repsRemainingHolder[0]--;
echoObserver.onNext(echoedVomit);
}
if (echoObserver.isCancelled()) {
log.fine("client cancelled the call 2");
return;
}
if (repsRemainingHolder[0] == 0) {
echoObserver.onCompleted();
log.fine("done");
return;
}
log.finer("sink clogged at rep "
+ (verbalVomit.getReps() - repsRemainingHolder[0] + 1));
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.CANCELLED) {
log.fine("client cancelled the call 1");
} else {
log.severe("server error: " + e);
e.printStackTrace();
}
} catch (Exception e) {
log.severe("server error: " + e);
e.printStackTrace();
echoObserver.onError(Status.INTERNAL.withCause(e).asException());
}
});
}

(hope the formatting will be ok this time ;-] )

The above code is in the branch named 'solution' of the previously mentioned github repo: https://github.com/morgwai/grpc-deadlock/blob/solution/src/main/java/pl/morgwai/samples/grpc/deadlock/EchoService.java

Cheers!
Reply all
Reply to author
Forward
0 new messages