C++ grpc::ClientAsyncReader proper usage, ReadInitialMetadata method

393 views
Skip to first unread message

Peter Kompaník

unread,
Oct 12, 2021, 7:13:20 AM10/12/21
to grpc.io
Hello,

I would like to know what is the proper usage of grpc::ClientAsyncReader  with a server side stream.  We use older version of grpc library - 1.16.0 on Debian Stretch.
After I create the reader, I use the grpc::ClientAsyncReader::StartCall() method.
I am not quite sure when I should issue the grpc::ClientAsyncReader::Finish() and if the returned status should be checked in case we receive many replies using server side stream.
And, I would like to know, under which circumstances it makes sense to use/shoudl be used ReadInitialMetadata() method.
I tried to issue it right after StartCall() and also before first Read(), but in both cases I get some assertions :

After Start: 12160 channel_cc.cc:160]          assertion failed: GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, ops->cq_tag(), nullptr)

 After Finish: 12327 async_stream.h:230]         assertion failed: !context_->initial_metadata_received_


Here a simplified version of the code I use:

  grpc::CompletionQueue cq;
  grpc::ClientContext context;
  StartStreamingRequest request;
  StreamingNotification reply;
 
  std::unique_ptr< grpc::ClientAsyncReader<StreamingNotification> >
      async_reader( stub_->PrepareAsyncStartStreaming(&context, request, &cq) );  


  void *tag = (void*)1234;
  async_reader->StartCall(tag);

     // Do I need to perform Finish immediately or should I wait?
  grpc::Status status;
  async_reader->Finish(&status, tag);      


  void *recv_tag = nullptr;
  bool ok = false;
  while(cq.Next(&recv_tag, &ok))
  {

    if (!ok)
      LOG_DEBUG("Completion queue returns nOK");
    if (recv_tag != tag)
      LOG_INFO("Tag does not match expected value" );    
    else
    {
      // Does it make sense to check the status.ok() here, is it necessary ?
      if (status.ok())
      {
        async_reader->Read(&reply, tag);
        ProcessNotification(reply);
      }
    }
  }

Thanks for any explanation or suggestions.
Peter
Reply all
Reply to author
Forward
0 new messages