Hi,
my service has 2 methods, which share a resource. Each method invocation needs exclusive access to this resource. In the example the client makes 5 successive requests for one of these methods, sends a message using the stream observer and completes the stream observer. Then, without further delay the next request is made in the same way.
The full source code of the sample application is here:
https://github.com/Niklas-Peter/grpc-async.
The GRPC server is configured with ServerBuilder.executor(Executors.newSingleThreadExecutor());
Here the most important extracts:
Client code:
@Slf4j
public class MyServiceClient {
@SneakyThrows
public void myServiceMethodA() {
log.info("myServiceMethodA(): started.");
var writeConnection = stub.myServiceMethodA(new LoggingStreamObserver());
log.info("myServiceMethodA(): received write connection.");
writeConnection.onNext(createEvent());
writeConnection.onCompleted();
}
}
Synchronous service implementation:
@Slf4j
public class MySyncService extends MyServiceGrpc.MyServiceImplBase {
private final Semaphore lock = new Semaphore(1);
@SneakyThrows
@Override
public StreamObserver<Event> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
log.info(responseObserver + ": Acquiring lock ...");
if (!lock.tryAcquire(10, TimeUnit.SECONDS)) {
log.warn(responseObserver + ": Lock acquire timeout exceeded");
return new NoOpEventStreamObserver(); // Only to prevent exceptions in the log.
}
log.info(responseObserver + ": Acquired lock.");
return new StreamObserver<>() {
private final List<Event> events = new ArrayList<>();
@Override
public void onNext(Event event) {
log.info(responseObserver + ": Received event.");
var preprocessedEvent = preprocess(event);
events.add(preprocessedEvent);
}
@Override
public void onCompleted() {
log.info(responseObserver + ": Received complete.");
var storageLibrary = new StorageLibrary();
storageLibrary.store(events.toArray(Event[]::new)).handle((unused, throwable) -> {
log.info(responseObserver + ": Store completed.");
responseObserver.onNext(Confirmation.newBuilder().build());
responseObserver.onCompleted();
lock.release();
return null;
});
}
private Event preprocess(Event event) {
// The preprocessing already requires the lock.
return event;
}
};
}
@SneakyThrows
@Override
public void myServiceMethodB(Event event, StreamObserver<Confirmation> responseObserver) {
if (!lock.tryAcquire(5, TimeUnit.SECONDS))
throw new TimeoutException("The lock acquire timeout exceeded.");
// Requires exclusive access to a shared resource and uses async I/O.
var storageLibrary = new StorageLibrary();
storageLibrary.store(event).handle((unused, throwable) -> {
responseObserver.onNext(Confirmation.newBuilder().build());
responseObserver.onCompleted();
lock.release();
return null;
});
}
@Override
public void otherServiceMethod(Event request, StreamObserver<Confirmation> responseObserver) {
// Do something independent from the other service methods.
}
}
Output:
09:53:03.453 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquiring lock ...
09:53:03.453 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Acquired lock.
09:53:03.456 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Acquiring lock ...
09:53:13.465 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@4a5ebb14: Lock acquire timeout exceeded
09:53:13.465 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Acquiring lock ...
09:53:23.484 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@40947ea2: Lock acquire timeout exceeded
09:53:23.484 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Acquiring lock ...
09:53:33.496 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@23f24271: Lock acquire timeout exceeded
09:53:33.496 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Acquiring lock ...
09:53:43.501 [pool-1-thread-1] WARN MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@44de4271: Lock acquire timeout exceeded
09:53:43.517 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received event.
09:53:43.517 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Received complete.
09:53:43.533 [pool-1-thread-1] INFO MySyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@580c60dd: Store completed.
In my initial synchronous implementation I use a Semaphore as a lock. This leads to a deadlock until the acquireLock() timeout exceeded. I think this happens:
1. The only thread in the pool executes the first request. By doing so the first request acquires the lock and the StreamObserver is returned.
2. The initial processing (before the first messages via the StreamObserver arrive) for the first request is done now and the only thread is released.
3. In the executor's queue the 2nd request is already waiting for a thread, which executes it, why the just released thread handles the 2nd request.
4. The thread blocks, because the Semaphore is not released, yet.
5. In the executor's queue are further requests as well as messages for the StreamObservers waiting. However, especially the messages for the first request's StreamObserver can not be handled, so that the lock is not released.
6. After all pending requests after the 1st request exceeded the acquireLock() timeout and hence failed,
the now released thread handles the messages for the StreamObserver of the 1st request and the 1st request completes.
Increasing the thread pool size is probably not a good solution, as it would always have to be greater than or equal to the number of concurrently active requests for these 2 methods.
Asynchronous service implementation:
public class MyAsyncService extends MyServiceGrpc.MyServiceImplBase {
private final AsyncSemaphore lock = new AsyncSemaphore(1, Optional.empty());
@SneakyThrows
@Override
public StreamObserver<Event> myServiceMethodA(StreamObserver<Confirmation> responseObserver) {
log.info(responseObserver + ": Acquiring lock ...");
// Already acquire the lock here (instead of in StreamObserver.onComplete())
// to ensure the lock is acquired in the order,
// in which the GRPC requests are handled, and not in the order, in which the GRPC stream observers
// complete.
var lockFuture = lock.acquire();
return new StreamObserver<>() {
private final List<Event> events = new ArrayList<>();
@Override
public void onNext(Event event) {
log.info(responseObserver + ": Received event.");
events.add(event);
}
@Override
public void onCompleted() {
log.info(responseObserver + ": Received complete; acquired lock: " + lockFuture.isDone());
lockFuture.thenAccept(permit -> {
log.info(responseObserver + ": Acquired lock.");
var preprocessedEvents = events.stream()
.map(MyAsyncService.this::preprocess)
.toArray(Event[]::new);
var storageLibrary = new StorageLibrary();
storageLibrary.store(preprocessedEvents).handle((unused, throwable) -> {
log.info(responseObserver + ": Store completed.");
responseObserver.onNext(Confirmation.newBuilder().build());
responseObserver.onCompleted();
permit.release();
return null;
});
});
}
};
}
@SneakyThrows
@Override
public void myServiceMethodB(Event event, StreamObserver<Confirmation> responseObserver) {
lock.acquireAndRun(() -> {
log.info(responseObserver + ": Acquired lock.");
// Requires exclusive access to a shared resource and uses async I/O.
var preprocessedEvent = preprocess(event);
var storageLibrary = new StorageLibrary();
return storageLibrary.store(preprocessedEvent).handle((unused, throwable) -> {
responseObserver.onNext(Confirmation.newBuilder().build());
responseObserver.onCompleted();
return null;
});
});
}
}
Output:
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquiring lock ...
10:14:55.096 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquiring lock ...
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received event.
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Received complete; acquired lock: true
10:14:55.112 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Acquired lock.
10:14:55.133 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@3794b425: Store completed.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Received complete; acquired lock: true
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Acquired lock.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received event.
10:14:55.141 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Received complete; acquired lock: false
10:14:55.141 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1299d0f0: Store completed.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Received complete; acquired lock: false
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received event.
10:14:55.153 [pool-1-thread-1] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Received complete; acquired lock: false
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Acquired lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@1893addf: Store completed.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Acquired lock.
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@357dea2d: Store completed.
10:14:55.153 [Thread-9] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Acquired lock.
10:14:55.153 [Thread-8] INFO MyAsyncService - io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl@183aa6cc: Store completed.
In my asynchronous implementation this deadlock does not appear. However, now I have to manage an application-side buffer for each active request until this request gets the lock. I hoped that there was a way to let GRPC handle the problem by buffering the requests at a higher level instead of this lower application-side level and handle the exception (discard the request, send an exception to the client, ...) when the buffer fills. For example, by returning a Future<StreamObserver<>> instead of a StreamObserver as I know it from
ASP.NET MVC asynchronous controller methods (
https://docs.microsoft.com/en-us/aspnet/mvc/overview/performance/using-asynchronous-methods-in-aspnet-mvc-4#CreatingAsynchGizmos). Probably there are similar examples in the Java world, but I have less experience with the Java ecosystem, yet.
Thanks in advance!