long live grpc issue in bidi stream

297 views
Skip to first unread message

Shobhit Srivastava

unread,
Apr 28, 2021, 2:16:23 AM4/28/21
to grpc.io
Hi All

In my application I have a client which sends the streams of record and server streams the acknowledgment, things are working fine for first request however for the subsequent request I am getting below errors. The subsequent request is made after 2 or 3 seconds(GetRequest in proto file below)

On Server side I am getting
Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}

On client Side I am getting 

CANCELLED: RST_STREAM closed stream. HTTP/2 error code: CANCEL


Please find the code:
proto file
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.tps";
option java_outer_classname = "TpsProto";
option objc_class_prefix = "tps";

package tps;
// Interface exported by the server.
service TPS {
  rpc GetRequest(Request) returns (Acknowledgement) {}// takes data from client and stream data to different grpc server
  rpc SendDataToServer(stream Request) returns (stream Acknowledgement) {}
}

 message  Request  {
  string policyId = 1;
  string txnId = 2;
  string clientId = 3;
  }
  
  message Acknowledgement {
  string txnId = 1;
  string clientId = 2;
  }
  

Client Side code:

public final class GrpcClient {
    private final Semaphore limiter = new Semaphore(1000);
    private final List<ManagedChannel> channels;
    private final List<TPSGrpc.TPSStub> futureStubList;
    StreamObserver<Request> str;
    public GrpcClient2(String host, int port) {
        channels = new ArrayList<>();
        futureStubList = new ArrayList<>();
        ManagedChannel channel =null;
             channel = NettyChannelBuilder.forAddress(host, port)
                    .usePlaintext().keepAliveWithoutCalls(true).keepAliveTime(20, TimeUnit.DAYS)
                    .build();
             
             channels.add(channel);
             futureStubList.add(TPSGrpc.newStub(channel));
             str = futureStubList.get(0).sendRequest(new StreamObserver<Acknowledgement>() {

  @Override
  public void onNext(Acknowledgement value) {
  // TODO Auto-generated method stub
  System.out.println(value.getTxnId());
 
  }
  @Override
  public void onError(Throwable t) {
  // TODO Auto-generated method stub
  System.out.println(t.getMessage());
 
  }
  @Override
  public void onCompleted() {
  // TODO Auto-generated method stub
  System.out.println("comp");
 
  }
  });
    }
    public void shutdown() throws InterruptedException {
    }
    public void verifyAsync(Request request) throws InterruptedException {
        limiter.acquire();
       str.onNext(request);
    }
}


Server Side


public class Service extends TPSGrpc.TPSImplBase{


static Map<String, GrpcClient> urlVsGrpcClient = new HashMap<>();
@Override
public void getRequest(Request request, StreamObserver<Acknowledgement> responseObserver) {
Request clone = Request.newBuilder().setClientId(request.getClientId())
.setTxnId(request.getTxnId()).build();
responseObserver.onNext(Acknowledgement.newBuilder().setTxnId("ab").build());
responseObserver.onCompleted();
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
for(int i=0;i<GrpcServer.node.size();i++) {
callNode(GrpcServer.node.get(i), clone);
}
}
});
}
@Override
public StreamObserver<Request> sendRequest(StreamObserver<Acknowledgement> responseObserver) {
 
return new StreamObserver<Request>() {

@Override
public void onNext(Request value) {
Acknowledgement ack = Acknowledgement.newBuilder().setClientId(value.getClientId()).
                  setTxnId(value.getTxnId()).build();
responseObserver.onNext(ack);
System.out.println(value.getTxnId());
}

@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
System.out.println("server comp");
}
};
}
private void callNode(String node, Request request) {
GrpcClient client = urlVsGrpcClient.get(node);
if (client == null) {
String[] split = node.split(":");
client = new GrpcClient(split[0], Integer.parseInt(split[1]));
urlVsGrpcClient.put(node, client);
}
try {
client.verifyAsync(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


Please check and suggest.

Thanks,
Shobhit Srivastava

Piotr Morgwai Kotarbinski

unread,
Apr 29, 2021, 4:14:27 AM4/29/21
to grpc.io
it's very hard to figure out what you are trying to achieve and how you expect this code to behave: you should narrow the problem and post the minimal example that causes it. Moreover, the code is in some intermediate inconsistent state and wouldn't even compile (proto has functions GetRequest and  SendDataToServer, but then in your code you call something named sendRequest GrpcClient has something that looks like constructor but is named  GrpcClient2 ; etc), which makes it even harder to read.
If you ask random ppl on the internet for help, you should make it easy for them to help you ;-)

Cheers!

Shobhit Srivastava

unread,
Apr 29, 2021, 4:49:42 AM4/29/21
to Piotr Morgwai Kotarbinski, grpc.io
Apologies for these silly mistakes, I actually pasted the same code with just little modification.
It is working now.
I will keep your points in mind.

Thanks

--
You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/72KdKTWDPx8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/370b60aa-3e0b-471d-9c82-87a2c6935d48n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages