I have grpc implemnetaion in java where i am using a blocking stubs
i am using those stubs from pool and
interestingly when load increases i see interrupted exception
My requirement is to reuse channel and stubs
What is correct way to reuse channel and stubs ?
public ComputeVariableServiceGrpc.ComputeVariableServiceBlockingStub getOrCreateStub() {
ComputeVariableServiceGrpc.ComputeVariableServiceBlockingStub stub = null;
try {
stub = stubPool.poll(grpcProperties.getClient().getClientPoolTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (isNull(stub) || isChannelClosed((ManagedChannel) stub.getChannel())) {
log.debug("Stub not available in queue or connection was closed, creating one...");
stub = ComputeVariableServiceGrpc.newBlockingStub(newManagedChannel());
}
}
catch (InterruptedException e) {
log.error("Error while waiting for stub from pool: [{}], [{}]", e.getCause(), e.getMessage());
}
return stub;
}
public ComputeVariablesResult computeVariable(String payload, String typeId) {
ComputeVariableServiceGrpc.ComputeVariableServiceBlockingStub computeVariableServiceBlockingStub = getOrCreateStub();//poll from stub pool
CdsComputeVariablesResult response = CdsComputeVariablesResult.newBuilder().build();
try {
Request request = CdsComputeVariablesRequest.newBuilder().setTypeId(typeId).setPayload(payload).build();
return computeVariableServiceBlockingStub.computeVariables(request);
}
}
catch (StatusRuntimeException statusRuntimeException) {
if (statusRuntimeException.getCause() != null) {
log.error("Error while computing variables, error cause and message is: [{}] [{}]", statusRuntimeException.getCause(), statusRuntimeException.getStatus().getDescription());
}
else {
log.error("Error while computing variables, error message is: [{}]", statusRuntimeException.getStatus().getDescription());
}
}
finally {
log.debug("Stub returned to queue");
releaseStub(computeVariableServiceBlockingStub);//release to pool
}
log.debug("ComputeVariableResult with typeId: [{}] is [{}] ", typeId, response);
return response;
}
public void releaseStub(ComputeVariableServiceGrpc.ComputeVariableServiceBlockingStub computeVariableServiceBlockingStub) {
boolean added = stubPool.offer(computeVariableServiceBlockingStub);
if (!added) {
log.info("Failed to add stub to the pool, remaining capacity is: [{}]", stubPool.remainingCapacity());
}
}
public void destroy() {
stubPool.forEach(stub -> {
if (nonNull(stub.getChannel())) {
((ManagedChannel) stub.getChannel()).shutdown();
}
});
stubPool.clear();
}
}