[howto] stream auto reconnect ?

4,730 views
Skip to first unread message

Smallufo Huang

unread,
Sep 29, 2016, 12:23:50 PM9/29/16
to grpc.io
Hi , I am new to gRPC.
I wonder how gRPC auto reconnect client and server (if server is down and restart ?)

This is my server code , I try to build a bi-directional stream 

rpc biStream(stream Int) returns (stream Int) {}

The server side listens int stream from client and double the value and return to client.

This is server code :
    @Override
    public StreamObserver<Int> biStream(StreamObserver<Int> responseObserver)  {
      return new StreamObserver<Int>() {
        @Override
        public void onNext(Int anInt) {
          try {
            TimeUnit.NANOSECONDS.sleep(RandomUtils.nextInt(100 , 1000));
          } catch (InterruptedException ignored) {
          }

          int value = anInt.getValue();
          logger.info("got {} from client." , value);
          responseObserver.onNext(Int.newBuilder().setValue(value*2).build());
        }

        @Override
        public void onError(Throwable throwable) {
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }
  };


And this is client code :

  public void biStreaming(int count) throws InterruptedException {
    logger.info("bi streaming");

    final CountDownLatch finishLatch = new CountDownLatch(1);

    StreamObserver<Int> respondObserver = new StreamObserver<Int>() {

      @Override
      public void onNext(Int anInt) {
        logger.info("get {} from server" , anInt.getValue());
      }

      @Override
      public void onError(Throwable throwable) {
        logger.error(throwable.getMessage());
        finishLatch.countDown();
      }

      @Override
      public void onCompleted() {
        logger.info("server completed");
        finishLatch.countDown();
      }
    };

    StreamObserver<Int> requestObserver = asyncStub.biStream(respondObserver);
    for(int i=0 ; i <count ; i++) {
      int rand = RandomUtils.nextInt(1, 101);
      TimeUnit.SECONDS.sleep(RandomUtils.nextInt(1,10));
      logger.info("sending {} to server" , rand);
      requestObserver.onNext(Int.newBuilder().setValue(rand).build());
    }
    requestObserver.onCompleted();
    finishLatch.await(1, TimeUnit.MINUTES);
  }



When running server , and starting the client , the client starts sending random int to server.

 [main] INFO  d.i.GrpcClient - sending 36 to server
 [grpc-default-executor-1] INFO  d.i.GrpcClient - get 72 from server
 [main] INFO  d.i.GrpcClient - sending 85 to server
 [grpc-default-executor-1] INFO  d.i.GrpcClient - get 170 from server
 [main] INFO  d.i.GrpcClient - sending 33 to server
 [grpc-default-executor-1] INFO  d.i.GrpcClient - get 66 from server

And I Ctrl-C the server side , the client shows :

 [grpc-default-executor-1] ERROR d.i.GrpcClient - INTERNAL: Connection closed with unknown cause
 [main] INFO  d.i.GrpcClient - sending 26 to server

Then I restart the server , but it seems client cannot rebuild the stream.

 [main] INFO  d.i.GrpcClient - sending 34 to server
 [main] INFO  d.i.GrpcClient - sending 5 to server
 [main] INFO  d.i.GrpcClient - sending 62 to server



Anyway to recover a stream ?

Thanks.


Eric Anderson

unread,
Oct 1, 2016, 4:20:34 PM10/1/16
to Smallufo Huang, grpc.io
When you receive the error on the client-side, the stream is dead. You should simply create a new RPC using the same Stub/Channel. The Channel will automatically create a new connection to the server, but it can't re-establish any streams.

When load balancing and proxies are involved, a particular stream always goes to the same backend. So if that stream breaks, gRPC can't simply issue a new request automatically, because it can't necessarily get the same backend, and the backend may no longer exist (which is the case when you Ctrl+C the server; when you restart, it is a different server process). So applications should just re-establish the failed stream.

The "INTERNAL: Connection closed with unknown cause" is actually a bug, which should be fixed in the next grpc-java release (1.1). It should have been UNAVAILABLE instead.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/b84de7b4-f98c-4bb3-850d-2eee15f763d6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages