Server server = NettyServerBuilder.forPort(port) .addService(testService) .executor(Executors.newFixedThreadPool(5)) .keepAliveTime(10, TimeUnit.SECONDS) .keepAliveTimeout(10, TimeUnit.SECONDS) .maxConnectionAge(30, TimeUnit.SECONDS) .maxConnectionAgeGrace(5, TimeUnit.SECONDS) .maxConnectionIdle(5, TimeUnit.SECONDS) .build();
// for the purpose of this test, this queue is pre-filled with a handful of StreamItems, and never has any other objects added to it
BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>();
@Override public void testStream(StreamRequest request, StreamObserver<StreamItem> responseObserver) { try { while (!Thread.interrupted()) { StreamItem next = queue.poll(); if (next == null) { LOGGER.debug("Queue is empty, waiting before next poll()");
Thread.sleep(1000); // send default instance which is empty, to signal that the queue is empty right now responseObserver.onNext(StreamItem.getDefaultInstance()); } else { responseObserver.onNext(next); } } } catch (InterruptedException e) { LOGGER.info("Interrupted. Exiting loop."); } catch (Exception e) { LOGGER.error("Unexpected error", e); responseObserver.onError(e); return; } responseObserver.onCompleted(); }
ManagedChannel channel = ManagedChannelBuilder.forTarget(target) .usePlaintext(true) .build();
TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);
try { Iterator<StreamItem> stream = blockingStub.testStream(subscriptionRequest); while (stream.hasNext()) { LOGGER.debug("Received an item:\n{}", stream.next()); }} catch (StatusRuntimeException e) { LOGGER.error("GRPC Exception: {}", e.getStatus());}
Inside testService, it has this server-side streaming method:
// for the purpose of this test, this queue is pre-filled with a handful of StreamItems, and never has any other objects added to it
BlockingQueue<StreamItem> queue = new LinkedBlockingQueue<>();@Overridepublic void testStream(StreamRequest request, StreamObserver<StreamItem> responseObserver) {try {while (!Thread.interrupted()) {StreamItem next = queue.poll();if (next == null) {LOGGER.debug("Queue is empty, waiting before next poll()");Thread.sleep(1000);// send default instance which is empty, to signal that the queue is empty right nowresponseObserver.onNext(StreamItem.getDefaultInstance());} else {responseObserver.onNext(next);}}} catch (InterruptedException e) {LOGGER.info("Interrupted. Exiting loop.");} catch (Exception e) {LOGGER.error("Unexpected error", e);responseObserver.onError(e);return;}responseObserver.onCompleted();}
I think another part of the confusion for me was that I wasn't expecting the Server's executor thread pool to be in charge of both handling and cancelling the RPC's. In my test, I had a fixed thread pool with 5 threads, and sometimes would start up 5 streams to clients. Then, if a client disconnected, the server wouldn't cancel it because it would have no executor threads available to do so.