Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.tps";
option java_outer_classname = "TpsProto";
option objc_class_prefix = "tps";
package tps;
// Interface exported by the server.
service TPS {
rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from client and stream data to different grpc server
rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {}
}
message
Request {
string policyId = 1;
string txnId = 2;
string clientId = 3;
}
message Acknowledgement {
string txnId = 1;
string clientId = 2;
}
Client Side code:
public final class GrpcClient {
private final Semaphore limiter = new Semaphore(1000);
private final List<ManagedChannel> channels;
private final List<TPSGrpc.TPSStub> futureStubList;
StreamObserver<Request> str;
public GrpcClient2(String host, int port) {
channels = new ArrayList<>();
futureStubList = new ArrayList<>();
ManagedChannel channel =null;
channel = NettyChannelBuilder.forAddress(host, port)
.usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS)
.build();
channels.add(channel);
futureStubList.add(TPSGrpc.newStub(channel));
str = futureStubList.get(0).sendRequest(new StreamObserver<Acknowledgement>() {
@Override
public void onNext(Acknowledgement value) {
// TODO Auto-generated method stub
System.out.println(value.getTxnId());
}
@Override
public void onError(Throwable t) {
// TODO Auto-generated method stub
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("comp");
}
});
}
public void shutdown() throws InterruptedException {
}
public void verifyAsync(Request request) throws InterruptedException {
limiter.acquire();
str.onNext(request);
}
}
Server Side
public class Service extends TPSGrpc.TPSImplBase{
static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>();
@Override
public void getRequest(Request request, StreamObserver<Acknowledgement> responseObserver) {
Request clone = Request.newBuilder().setClientId(request.getClientId())
.setTxnId(request.getTxnId()).build();
responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build());
responseObserver.onCompleted();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
for(int i=0;i<GrpcServer.node.size();i++) {
callNode(GrpcServer.node.get(i), clone);
}
}
});
}
@Override
public StreamObserver<Request> sendRequest(StreamObserver<Acknowledgement> responseObserver) {
return new StreamObserver<Request>() {
@Override
public void onNext(Request value) {
Acknowledgement ack = Acknowledgement.newBuilder().setClientId(value.getClientId()).
setTxnId(value.getTxnId()).build();
responseObserver.onNext(ack);
System.out.println(value.getTxnId());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
System.out.println("server comp");
}
};
}
private void callNode(String node, Request request) {
GrpcClient client = urlVsGrpcClient.get(node);
if (client == null) {
String[] split = node.split(":");
client = new GrpcClient(split[0], Integer.parseInt(split[1]));
urlVsGrpcClient.put(node, client);
}
try {
client.verifyAsync(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}