GRPC-Java Client did not receive all the message from Server Stream

112 views
Skip to first unread message

Avinash Dongre

unread,
Sep 15, 2016, 3:23:04 AM9/15/16
to grpc.io
I have client which make a request to server and Server in reply sends a Stream-reply, This works fine for small number of message but when the number of messages are large then My client is shutdown before I could receive all the message.

Following is how I have implemented Server RPC Streaming method.

ScanReply.Builder scanReplyBuilder = ScanReply.newBuilder();
final ServerCallStreamObserver<ScanReply> scso =
(ServerCallStreamObserver<ScanReply>) responseObserver;

Runnable drain = new Runnable() {
long remaining = 20_000_000L;

public void run() {
if (remaining == 0L) return;
for (; remaining > 0L && scso.isReady(); remaining--) {
scanReplyBuilder.clear();
scanReplyBuilder.setRow(ByteString.copyFrom(new byte[128]));
scso.onNext(scanReplyBuilder.build());
}
if (remaining == 0) {
scso.onCompleted();
}
}
};
scso.setOnReadyHandler(drain);
drain.run();

and Client code is implemented as Follows, this 

    final CountDownLatch finishLatch = new CountDownLatch(1);
int rowsReceived = 0;
final List<Long> rowList = new ArrayList<>();
ScanRequest request = ScanRequest.newBuilder()
.setTableName(tableName)
.setStartKey(ByteString.copyFrom(startRow))
.setStopKey(ByteString.copyFrom(stopRow))
.build();

StreamObserver<ScanReply> responseObserver = new StreamObserver<ScanReply>() {
@Override
public void onNext(ScanReply value) {
byte[] row = value.getRow().toByteArray();
rowList.add(1L);
}

@Override
public void onError(Throwable t) {
finishLatch.countDown();
}

@Override
public void onCompleted() {
finishLatch.countDown();
System.out.println("TableServiceClient.onCompleted.SIZE --> " + rowList.size());
}
};
this.valuesList.get(0).tableScan(request, responseObserver);
try {
finishLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}

Thanks
Avinash

Avinash Dongre

unread,
Sep 16, 2016, 5:10:20 AM9/16/16
to grpc.io
Any Help Guys ?

Thanks
Avinash

Andreas Pillath

unread,
Sep 16, 2016, 10:15:04 AM9/16/16
to grpc.io
Maybe sending  20_000_000L messages takes longer than 1min. Then 
finishLatch.await(1, TimeUnit.MINUTES);
would return and shutdown your client.

Avinash Dongre

unread,
Sep 18, 2016, 7:34:44 PM9/18/16
to grpc.io
Thanks Andreas,
It was the reason for client shutdown.

Thanks
Avinash
Reply all
Reply to author
Forward
0 new messages