Synchronous and asynchronous waiting in GRPC service methods

1,689 views
Skip to first unread message

weißnet auchnicht

unread,
Sep 30, 2020, 4:18:54 AM9/30/20
to grpc.io
Hi,

my service has 2 methods, which share a resource. Each method invocation needs exclusive access to this resource. In the example the client makes 5 successive requests for one of these methods, sends a message using the stream observer and completes the stream observer. Then, without further delay the next request is made in the same way.

The full source code of the sample application is here: https://github.com/Niklas-Peter/grpc-async.

The GRPC server is configured with ServerBuilder.executor(Executors.newSingleThreadExecutor());

Here the most important extracts:

Client code:
@Slf4j
public class MyServiceClient {
    @SneakyThrows
    public void myServiceMethodA() {
        log.info("myServiceMethodA(): started.");
        var writeConnection = stub.myServiceMethodA(new LoggingStreamObserver());
        log.info("myServiceMethodA(): received write connection.");

        writeConnection.onNext(createEvent());
        writeConnection.onCompleted();
    }
}

Synchronous service implementation:
@Slf4j
public class MySyncService extends MyServiceGrpc.MyServiceImplBase {
    private final Semaphore lock = new Semaphore(1);

    @SneakyThrows
    @Override
    public StreamObserver<Event> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
        log.info(responseObserver + ": Acquiring lock ...");
        if (!lock.tryAcquire(10, TimeUnit.SECONDS)) {
            log.warn(responseObserver + ": Lock acquire timeout exceeded");
            return new NoOpEventStreamObserver(); // Only to prevent exceptions in the log.
        }

        log.info(responseObserver + ": Acquired lock.");

        return new StreamObserver<>() {
            private final List<Event> events = new ArrayList<>();

            @Override
            public void onNext(Event event) {
                log.info(responseObserver + ": Received event.");

                var preprocessedEvent = preprocess(event);
                events.add(preprocessedEvent);
            }

            @Override
            public void onCompleted() {
                log.info(responseObserver + ": Received complete.");

                var storageLibrary = new StorageLibrary();
                storageLibrary.store(events.toArray(Event[]::new)).handle((unused, throwable) -> {
                    log.info(responseObserver + ": Store completed.");

                    responseObserver.onNext(Confirmation.newBuilder().build());
                    responseObserver.onCompleted();

                    lock.release();

                    return null;
                });
            }

            private Event preprocess(Event event) {
                // The preprocessing already requires the lock.
                return event;
            }
        };
    }

    @SneakyThrows
    @Override
    public void myServiceMethodB(Event event, StreamObserver<Confirmation> responseObserver) {
        if (!lock.tryAcquire(5, TimeUnit.SECONDS))
            throw new TimeoutException("The lock acquire timeout exceeded.");

        // Requires exclusive access to a shared resource and uses async I/O.
        var storageLibrary = new StorageLibrary();
        storageLibrary.store(event).handle((unused, throwable) -> {
            responseObserver.onNext(Confirmation.newBuilder().build());
            responseObserver.onCompleted();

            lock.release();

            return null;
        });
    }


    @Override
    public void otherServiceMethod(Event request, StreamObserver<Confirmation> responseObserver) {
        // Do something independent from the other service methods.
    }
}

Output:
09:53:03.453 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring lock ...
09:53:03.453 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired lock.
09:53:03.456 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring lock ...
09:53:13.465 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock acquire timeout exceeded
09:53:13.465 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring lock ...
09:53:23.484 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock acquire timeout exceeded
09:53:23.484 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring lock ...
09:53:33.496 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock acquire timeout exceeded
09:53:33.496 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring lock ...
09:53:43.501 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock acquire timeout exceeded
09:53:43.517 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received event.
09:53:43.517 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received complete.
09:53:43.533 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store completed.


In my initial synchronous implementation I use a Semaphore as a lock. This leads to a deadlock until the acquireLock() timeout exceeded. I think this happens:
1. The only thread in the pool executes the first request. By doing so the first request acquires the lock and the StreamObserver is returned.
2. The initial processing (before the first messages via the StreamObserver arrive) for the first request is done now and the only thread is released.
3. In the executor's queue the 2nd request is already waiting for a thread, which executes it, why the just released thread handles the 2nd request.
4. The thread blocks, because the Semaphore is not released, yet.
5. In the executor's queue are further requests as well as messages for the StreamObservers waiting. However, especially the messages for the first request's StreamObserver can not be handled, so that the lock is not released.
6. After all pending requests after the 1st request exceeded the acquireLock() timeout and hence failed,
the now released thread handles the messages for the StreamObserver of the 1st request and the 1st request completes.

Increasing the thread pool size is probably not a good solution, as it would always have to be greater than or equal to the number of concurrently active requests for these 2 methods.

Asynchronous service implementation:
public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase {
    private final AsyncSemaphore lock = new AsyncSemaphore(1, Optional.empty());

    @SneakyThrows
    @Override
    public StreamObserver<Event> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
        log.info(responseObserver + ": Acquiring lock ...");
        // Already acquire the lock here (instead of in StreamObserver.onComplete())
        // to ensure the lock is acquired in the order,
        // in which the GRPC requests are handled, and not in the order, in which the GRPC stream observers
        // complete.
        var lockFuture = lock.acquire();

        return new StreamObserver<>() {
            private final List<Event> events = new ArrayList<>();

            @Override
            public void onNext(Event event) {
                log.info(responseObserver + ": Received event.");

                events.add(event);
            }

            @Override
            public void onCompleted() {
                log.info(responseObserver + ": Received complete; acquired lock: " + lockFuture.isDone());

                lockFuture.thenAccept(permit -> {
                    log.info(responseObserver + ": Acquired lock.");

                    var preprocessedEvents = events.stream()
                                                   .map(MyAsyncService.this::preprocess)
                                                   .toArray(Event[]::new);
                    var storageLibrary = new StorageLibrary();
                    storageLibrary.store(preprocessedEvents).handle((unused, throwable) -> {
                        log.info(responseObserver + ": Store completed.");

                        responseObserver.onNext(Confirmation.newBuilder().build());
                        responseObserver.onCompleted();

                        permit.release();

                        return null;
                    });
                });
            }
        };
    }

    @SneakyThrows
    @Override
    public void myServiceMethodB(Event event, StreamObserver<Confirmation> responseObserver) {
        lock.acquireAndRun(() -> {
            log.info(responseObserver + ": Acquired lock.");

            // Requires exclusive access to a shared resource and uses async I/O.
            var preprocessedEvent = preprocess(event);
            var storageLibrary = new StorageLibrary();
            return storageLibrary.store(preprocessedEvent).handle((unused, throwable) -> {
                responseObserver.onNext(Confirmation.newBuilder().build());
                responseObserver.onCompleted();

                return null;
            });
        });
    }
}


Output:
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring lock ...
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received event.
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received complete; acquired lock: true
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired lock.
10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store completed.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received complete; acquired lock: true
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired lock.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received complete; acquired lock: false
10:14:55.141 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store completed.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received complete; acquired lock: false
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received complete; acquired lock: false
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store completed.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired lock.
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store completed.
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store completed.

In my asynchronous implementation this deadlock does not appear. However, now I have to manage an application-side buffer for each active request until this request gets the lock. I hoped that there was a way to let GRPC handle the problem by buffering the requests at a higher level instead of this lower application-side level and handle the exception (discard the request, send an exception to the client, ...) when the buffer fills. For example, by returning a Future<StreamObserver<>> instead of a StreamObserver as I know it from ASP.NET MVC asynchronous controller methods (https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos). Probably there are similar examples in the Java world, but I have less experience with the Java ecosystem, yet.

I have already read this discussion: https://groups.google.com/g/grpc-io/c/XCMIva8NDO8

Thanks in advance!
Message has been deleted

sanjay...@google.com

unread,
Sep 30, 2020, 2:37:06 PM9/30/20
to grpc.io
Sorry, just realized you are using client streaming. If you need to share the resource just between 2 methods of the same service. Can you get rid of your lock and just use directExecutor https://github.com/grpc/grpc-java/blob/master/api/src/main/java/io/grpc/ServerBuilder.java#L58 ?

Alternatively you can use 2 threads instead of the single-thread-executor so that one of the threads will be used to complete the current request while the other thread is receiving next request(s).

Eric Anderson

unread,
Sep 30, 2020, 3:52:01 PM9/30/20
to sanjay...@google.com, grpc.io
You should not use directExecutor. It offers no benefits over the single-thread thread pool for your use-case. I'm writing a lengthier reply.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/a6bf1306-8471-4c2d-a28c-3582f96157f7n%40googlegroups.com.

sonst...@googlemail.com

unread,
Sep 30, 2020, 3:54:33 PM9/30/20
to grp...@googlegroups.com
Thanks for your replies so far!

I configured the thread pool with only a single thread just for
demonstration purposes. In production, I intend to use multiple threads
for GRPC. However, as I suggested in my initial question, I do not think
that my general problem is solvable by changing the number of threads.

Eric Anderson

unread,
Sep 30, 2020, 4:06:24 PM9/30/20
to weißnet auchnicht, grpc.io
On Wed, Sep 30, 2020 at 1:18 AM 'weißnet auchnicht' via grpc.io <grp...@googlegroups.com> wrote:
Increasing the thread pool size is probably not a good solution, as it would always have to be greater than or equal to the number of concurrently active requests for these 2 methods.

Increasing the thread pool size is an easy solution and keeps the code simple. But yes, it really requires an unbounded thread pool to avoid issues, although you could limit the service to 10 outstanding RPCs and fail any above that threshold.

However, now I have to manage an application-side buffer for each active request until this request gets the lock. I hoped that there was a way to let GRPC handle the problem by buffering the requests at a higher level instead of this lower application-side level and handle the exception (discard the request, send an exception to the client, ...) when the buffer fills.

That is possible; we call it flow control. You can take a look at the manualflowcontrol example. Unfortunately it is an invisible API unless someone (like an example, or me) refers you to it.

Basically, you want to cast StreamObserver<Confirmation> responseObserver to ServerCallStreamObserver<Confirmation>. At that point you can call disableAutoRequest(). By default, every time you return from onNext(), the stub will call request(1) on your behalf, asking for another request. With this API you disable that behavior to manage incoming messages yourself.

So you'd call disableAutoRequest(), and once you acquire the lock you'd call request(1) each time you're ready for a message.

For example, by returning a Future<StreamObserver<>> instead of a StreamObserver as I know it from ASP.NET MVC asynchronous controller methods (https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos). Probably there are similar examples in the Java world, but I have less experience with the Java ecosystem, yet.

Your difficulty is associated with using client streaming, so that API doesn't really seem related. Although I'm not too familiar with it. Also, C# has async/await which makes things like this a lot easier.

weißnet auchnicht

unread,
Oct 1, 2020, 4:04:02 AM10/1/20
to grpc.io
Increasing the thread pool size is an easy solution and keeps the code simple. But yes, it really requires an unbounded thread pool to avoid issues, although you could limit the service to 10 outstanding RPCs and fail any above that threshold.
I assume using an application-side mechanism, e.g., counting the number of pending requests and throwing an exception if it exceeds the limit, or?

Basically, you want to cast StreamObserver<Confirmation> responseObserver to ServerCallStreamObserver<Confirmation>. At that point you can call disableAutoRequest(). By default, every time you return from onNext(), the stub will call request(1) on your behalf, asking for another request. With this API you disable that behavior to manage incoming messages yourself.

So you'd call disableAutoRequest(), and once you acquire the lock you'd call request(1) each time you're ready for a message.
That is definitively an improvement to my considerations so far.
However, if I get the example right, this only limits the messages within a single stream, but not the total number of concurrent streams, why I would still need the solution mentioned above to limit the number of pending requests. Right?

Eric Anderson

unread,
Oct 1, 2020, 10:35:42 AM10/1/20
to weißnet auchnicht, grpc.io
On Thu, Oct 1, 2020 at 1:04 AM 'weißnet auchnicht' via grpc.io <grp...@googlegroups.com> wrote:
Increasing the thread pool size is an easy solution and keeps the code simple. But yes, it really requires an unbounded thread pool to avoid issues, although you could limit the service to 10 outstanding RPCs and fail any above that threshold.
I assume using an application-side mechanism, e.g., counting the number of pending requests and throwing an exception if it exceeds the limit, or?

Application-side or using a server interceptor.

Basically, you want to cast StreamObserver<Confirmation> responseObserver to ServerCallStreamObserver<Confirmation>. At that point you can call disableAutoRequest(). By default, every time you return from onNext(), the stub will call request(1) on your behalf, asking for another request. With this API you disable that behavior to manage incoming messages yourself.

So you'd call disableAutoRequest(), and once you acquire the lock you'd call request(1) each time you're ready for a message.
That is definitively an improvement to my considerations so far.
However, if I get the example right, this only limits the messages within a single stream, but not the total number of concurrent streams, why I would still need the solution mentioned above to limit the number of pending requests. Right?

Yes, it doesn't limit the total number of pending RPCs. The pending RPCs have limits on the amount of memory they can use, but they would still exist.

sonst...@googlemail.com

unread,
Oct 1, 2020, 11:42:16 AM10/1/20
to Eric Anderson, grpc.io
OK.
Thanks!
Reply all
Reply to author
Forward
0 new messages