vector<unique_ptr<Service::Stub>> stubs;
// populate stubs with one stub per backend
CompletionQueue cq;
// Create a ClientContext, Status, Reply, and rpc for each backend.
vector<unique_ptr<ClientContext>> contexts;
vector<unique_ptr<Status>> statuses;
vector<unique_ptr<Reply>> replies;
vector<unique_ptr<ClientAsyncResponseReader<Reply>>> rpcs;
const auto start_time = chrono::system_clock::now();
const chrono::system_clock::time_point deadline = start_time + chrono::milliseconds(5000);
for (size_t i = 0; i < stubs.size(); i++) {
ClientContext* context = new ClientContext();
context->set_deadline(deadline);
contexts.emplace_back(context);
statuses.emplace_back(new Status());
Reply* reply = new Reply();
replies->emplace_back(reply);
rpcs.emplace_back(stubs[i]->AsyncFooCall(context, request, &cq));
rpcs[i]->Finish(reply, statuses[i].get(), (void*)i);
}
int num_rpcs_finished = 0;
int num_rpcs_finished_ok = 0;
while (num_rpcs_finished < stubs.size()) {
void* which_backend_ptr;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
cq.Next(&which_backend_ptr, &ok);
num_rpcs_finished++;
const size_t which_backend = size_t(which_backend_ptr);
const Status& status = *(statuses[which_backend].get());
LOG(info) << "rpc #" << which_backend << " done after " << elapsed_ms(start_time) << "ms";
if (status.ok()) {
LOG(info) << "rpc ok";
num_rpcs_finished_ok++;
} else {
if (status.error_code() == StatusCode::DEADLINE_EXCEEDED) {
LOG(error) << "rpc timed out";
} else {
LOG(error) << "rpc failed because:" << status.error_code();
}
}
}
LOG(info) << stubs.size() << " rpcs attempted, " << num_rpcs_finished_ok << "/" << num_rpcs_finished << " rpcs finished ok";
There are a few things I wasn't sure of:
- could the ClientContext be shared across different rpc calls if it's the same for each call?
- are the ClientContext and Status large enough objects to warrant having an array of pointers to them?
- what's the difference between the "ok" set by cq.Next() vs. the status.ok() ?
- will Next() ever fail to yield all N of the rpcs that were initiated? That is, is stubs.size() == num_rpcs_finished at the end of my code?
- would I be able to re-adjust the deadline of the client context while the rpcs are running? For instance, all but 1 backend responds in 50ms, could I reset the deadline of the last backend to 100ms?