Following is how I have implemented Server RPC Streaming method.
ScanReply.Builder scanReplyBuilder = ScanReply.newBuilder();
final ServerCallStreamObserver<ScanReply> scso =
(ServerCallStreamObserver<ScanReply>) responseObserver;
Runnable drain = new Runnable() {
long remaining = 20_000_000L;
public void run() {
if (remaining == 0L) return;
for (; remaining > 0L && scso.isReady(); remaining--) {
scanReplyBuilder.clear();
scanReplyBuilder.setRow(ByteString.copyFrom(new byte[128]));
scso.onNext(scanReplyBuilder.build());
}
if (remaining == 0) {
scso.onCompleted();
}
}
};
scso.setOnReadyHandler(drain);
drain.run();
and Client code is implemented as Follows, this
final CountDownLatch finishLatch = new CountDownLatch(1);
int rowsReceived = 0;
final List<Long> rowList = new ArrayList<>();
ScanRequest request = ScanRequest.newBuilder()
.setTableName(tableName)
.setStartKey(ByteString.copyFrom(startRow))
.setStopKey(ByteString.copyFrom(stopRow))
.build();
StreamObserver<ScanReply> responseObserver = new StreamObserver<ScanReply>() {
@Override
public void onNext(ScanReply value) {
byte[] row = value.getRow().toByteArray();
rowList.add(1L);
}
@Override
public void onError(Throwable t) {
finishLatch.countDown();
}
@Override
public void onCompleted() {
finishLatch.countDown();
System.out.println("TableServiceClient.onCompleted.SIZE --> " + rowList.size());
}
};
this.valuesList.get(0).tableScan(request, responseObserver);
try {
finishLatch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thanks
Avinash