example c++ code for client doing multiple async rpcs concurrently?

3,342 views
Skip to first unread message

dus...@ziprecruiter.com

unread,
Sep 21, 2015, 1:43:10 PM9/21/15
to grpc.io
I have a C++ client that is calling multiple backend servers with grpc at the same time, waiting for all (or most) of the backends to respond.  Is there some example code on how to do this correctly?

dus...@ziprecruiter.com

unread,
Sep 23, 2015, 2:25:38 PM9/23/15
to grpc.io, dus...@ziprecruiter.com
I ended up writing something like this:

        vector<unique_ptr<Service::Stub>> stubs;
       
// populate stubs with one stub per backend

       
CompletionQueue cq;
       
// Create a ClientContext, Status, Reply, and rpc for each backend.
        vector
<unique_ptr<ClientContext>> contexts;
        vector
<unique_ptr<Status>> statuses;
        vector<unique_ptr<Reply>> replies;
        vector
<unique_ptr<ClientAsyncResponseReader<Reply>>> rpcs;

       
const auto start_time = chrono::system_clock::now();
       
const chrono::system_clock::time_point deadline = start_time + chrono::milliseconds(5000);

       
for (size_t i = 0; i < stubs.size(); i++) {
           
ClientContext* context = new ClientContext();
            context
->set_deadline(deadline);
            contexts
.emplace_back(context);

            statuses
.emplace_back(new Status());                              
           
Reply* reply = new Reply();                                        
            replies
->emplace_back(reply);
   
            rpcs
.emplace_back(stubs[i]->AsyncFooCall(context, request, &cq));                                                          
            rpcs
[i]->Finish(reply, statuses[i].get(), (void*)i);
       
}


       
int num_rpcs_finished = 0;
       
int num_rpcs_finished_ok = 0;
       
while (num_rpcs_finished < stubs.size()) {
           
void* which_backend_ptr;
           
bool ok = false;
           
// Block until the next result is available in the completion queue "cq".
            cq
.Next(&which_backend_ptr, &ok);
            num_rpcs_finished
++;
           
const size_t which_backend = size_t(which_backend_ptr);
           
const Status& status = *(statuses[which_backend].get());
            LOG
(info) << "rpc #" << which_backend << " done after " << elapsed_ms(start_time) << "ms";

           
if (status.ok()) {
                LOG
(info) << "rpc ok";
                num_rpcs_finished_ok
++;
           
} else {
               
if (status.error_code() == StatusCode::DEADLINE_EXCEEDED) {
                    LOG
(error) << "rpc timed out";
               
} else {
                    LOG
(error) << "rpc failed because:" << status.error_code();
               
}
           
}
       
}

        LOG
(info) << stubs.size() << " rpcs attempted, " << num_rpcs_finished_ok << "/" << num_rpcs_finished << " rpcs finished ok";



There are a few things I wasn't sure of:
- could the ClientContext be shared across different rpc calls if it's the same for each call?
- are the ClientContext and Status large enough objects to warrant having an array of pointers to them?
- what's the difference between the "ok" set by cq.Next() vs. the status.ok() ?
- will Next() ever fail to yield all N of the rpcs that were initiated?  That is, is stubs.size() == num_rpcs_finished at the end of my code?
- would I be able to re-adjust the deadline of the client context while the rpcs are running? For instance, all but 1 backend responds in 50ms, could I reset the deadline of the last backend to 100ms?

Yang Gao

unread,
Sep 23, 2015, 2:44:34 PM9/23/15
to dus...@ziprecruiter.com, grpc.io
The code looks good to me. Thanks for sharing.

There are a few things I wasn't sure of:
- could the ClientContext be shared across different rpc calls if it's the same for each call?
No. ClientContext cannot be shared or reused.
- are the ClientContext and Status large enough objects to warrant having an array of pointers to them?
ClientContext is small if you do not have metadata, Status should be pretty small.
- what's the difference between the "ok" set by cq.Next() vs. the status.ok() ?
cq.Next's ok means if the operation (like a read or write) is successful. Status' ok means the final status of the rpc.
- will Next() ever fail to yield all N of the rpcs that were initiated?  That is, is stubs.size() == num_rpcs_finished at the end of my code?
No. Your while loop will always exit at the end since you have deadline set.
- would I be able to re-adjust the deadline of the client context while the rpcs are running? For instance, all but 1 backend responds in 50ms, could I reset the deadline of the last backend to 100ms?
No. The deadline goes on the wire and you cannot change it after sending the rpc out.



--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/ef46e9d2-a06c-48b2-8266-6314457dd30c%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

mugdhabo...@gmail.com

unread,
Dec 8, 2017, 10:27:25 AM12/8/17
to grpc.io
You should create a new deadline just before  initializing the client context inside the for loop and assign it to a new context object immediately after.
Something like :

const auto start_time = chrono::system_clock::now();
const chrono::system_clock::time_point deadline = start_time + chrono::milliseconds(5000);
ClientContext* context = new ClientContext();
context
->set_deadline(deadline);

Since deadline is a timestamp, it  is better to declare the deadline just before the context initialization. The deadline implementation as yours will not consider the delays in for loop and lead to imprecise deadline execution. 

Regards,
Mugdha, Meghana

asalih...@googlemail.com

unread,
Jan 14, 2019, 4:34:24 PM1/14/19
to grpc.io
Hi there 

do you have a full hellowWorld example using C++ code to share pls?

Kindest regards
Ahmed
Reply all
Reply to author
Forward
0 new messages