How to implement async gRPC with more than one method?

348 views
Skip to first unread message

Joseph Liu

unread,
Nov 27, 2020, 2:05:39 PM11/27/20
to grpc.io
I'm trying to implement async RPC for multiple methods and going by their async greeter [server][1]/[client][2] example, it's not entirely clear how the server handles different methods as the example only implements a single method for the server, `SayHello` and there's no corresponding method named `SayHello` on the server like there is for the [synchronous version][2].

Instead, the logic for handling `SayHello` seems to be handled in the `Proceed` function. If I wanted to create another method, do I just handle it all in Proceed? And if so, how would I do that?

    void Proceed() {
          if (status_ == CREATE) {
            // Make this instance progress to the PROCESS state.
            status_ = PROCESS;
    
            // As part of the initial CREATE state, we *request* that the system
            // start processing SayHello requests. In this request, "this" acts are
            // the tag uniquely identifying the request (so that different CallData
            // instances can serve different requests concurrently), in this case
            // the memory address of this CallData instance.
            service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                      this);
          } else if (status_ == PROCESS) {
            // Spawn a new CallData instance to serve new clients while we process
            // the one for this CallData. The instance will deallocate itself as
            // part of its FINISH state.
            new CallData(service_, cq_);
    
            // The actual processing.
            std::string prefix("Hello ");
            reply_.set_message(prefix + request_.name());
    
            // And we are done! Let the gRPC runtime know we've finished, using the
            // memory address of this instance as the uniquely identifying tag for
            // the event.
            status_ = FINISH;
            responder_.Finish(reply_, Status::OK, this);
          } else {
            GPR_ASSERT(status_ == FINISH);
            // Once in the FINISH state, deallocate ourselves (CallData).
            delete this;
          }
        }


Christopher Warrington - MSFT

unread,
Dec 2, 2020, 3:56:31 PM12/2/20
to grpc.io
On Friday, November 27, 2020 at 11:05:39 AM UTC-8 Joseph Liu wrote:

> Instead, the logic for handling `SayHello` seems to be handled in the
> `Proceed` function. If I wanted to create another method, do I just handle
> it all in Proceed? And if so, how would I do that?

A high level flow that you can use:

1. Create one completion queue. (You can create multiple, but that isn't
   relevant to the problem you're trying to solve right now. Multiple
   completion queues will come in to play much later and only if you see
   lock contention inside the completion queues themselves.)

2. For each method you want to handle, invoke service_->RequestFoo(),
   service_->RequestBar(), ... with _unique_ context, request, responder,
   and tag values.

3. Poll the completion queue. When the Request* tag value for, say, the
   RequestFoo method comes out of the completion queue, run the Foo() logic.
   When the RequestBar completion queue comes out, instead, run the Bar()
   logic.

4. After the Request* tag has come out of the completion queue, you will
   need to re-invoke service_->Request* with a new unique set of context,
   request, responder, and tag values. If you don't do this, gRPC will not
   process an incoming call again. You _do not_ need to wait for the logic
   of responding to the prior call to finish before doing this.

When I implemented this pattern, I had a helper type like the example's
CallData that knew the association between the gRPC method and the "business
logic" to run. It's job was to request a call from gRPC, wait for the call
to come out of the completion queue, and hand off the invocation of the
business logic to a "work item dispatcher" component.

You will need to adjust this for streaming calls, but that should be
mechanical.

It looked something like the following. (I wrote this code in my email
client. I'm sure it doesn't compile, but you should be able to see the
pathways.)

int main(int argc, char** argv) {
  std::shared_ptr<ServerCompletionQueue> cq = SetupCompletionQueue();
  std::shared_ptr<Dispatcher> dispatcher = SetupDispatcher(cq);
  std::unique_ptr<Server> = SetupGrpcServerViaServerBuilder();

  Greeter::AsyncService service;

  MethodRegistration<Greeter::AsyncService, HelloRequest, HelloResponse> sayHello1(
    cq,
    service,
    Greeter::AsyncService::RequestSayHello1,
    &SomeServiceImpl::HandleSayHello1,
    workItemDispatcher);

  MethodRegistration<Greeter::AsyncService, HelloRequest, HelloResponse> sayHello2(
    cq,
    service,
    Greeter::AsyncService::RequestSayHello2,
    &SomeServiceImpl::HandleSayHello2,
    workItemDispatcher);

// In our system, everything enqueued to a gRPC completion queue implements
// this interface.
//
// The threads that poll the gRPC completion queue then just do the
// following:
//
// void loop() {
//   void* tag;
//   bool ok;
//
//   while (m_cq->Next(&tag, &ok))
//   {
//     static_cast<CompletionQueueItem*>(tag)->Invoke(ok);
//   }
// }
class CompletionQueueItem
{
public:
  virtual ~CompletionQueueItem() = default;
  void Invoke(bool ok) = 0;
};

template <typename TServiceStub, typename TRequest, typename TResponse>
class MethodRegistration : CompletionQueueItem
{
public:
  // A pointer to the member Request* function from the generated stub.
  // Pointer to member function only used as an example.
  using RegistrationFunc = bool (TServiceStub::*)(
    ServerContext*,
    TRequest*,
    TResponse*,
    ServerCompletionQueue*,
    ServerCompletionQueue*,
    void*);

  // Some business logic/user code associated with processing a request.
  // std::function only used as an example.
  using std::function<ServerContext*, TRequest*, Tresponse> UserFunc;

  MethodRegistration(
    std::shared_ptr<ServerCompletionQueue> cq,
    TServiceStub* service,
    RegistrationFunc registrationFunc,
    UserFunc userFunc,
    std::shared_ptr<Dispatcher> dispatcher)
  : m_cq(cq),
    m_service(service),
    m_registrationFunc(registrationFunc),
    m_businessLogic(userFunc),
    m_dispatcher(dispatcher) {
    Register();
  }

  void Invoke(bool ok) {
    if (ok) {
      // EnqueueUnaryCall knows how to invoke the std::function with the
      // values from _unaryRequestData and send the response back.
      m_dispatcher.EnqueueUnaryCall(
        m_businessLogic,
        std::move(m_unaryRequestData));

      Register();
    }
  }

private:
  void Register() {
    // Create a new struct for per-request data. Per-request data has a
    // different lifetime than the data needed to register for method calls.
    
    m_unaryRequestData = std::make_unique<UnaryRequestData<TRequest, TResponse>>();
    (m_service.*m_registrationFunc)(
      &m_unaryRequestData->ctx,
      &m_unaryRequestData->request,
      &m_unaryRequestData->responder,
      m_cq,
      m_cq,
      static_cast<void*>(static_cast<CompletionQueueItem*>(this)));
  }

  std::shared_ptr<ServerCompletionQueue> m_cq;

  TServiceStub* m_service;

  RegistrationFunc m_registrationFunc;

  std::function<ServerContext*, TRequest*, TResponse> m_businessLogic;

  // Something that can run arbitrary bits of code. E.g., a thread pool. In
  // this example, the dispatcher also know how to poll completion queues.
  std::shared_ptr<Dispatcher> m_dispatcher;

  std::unique_ptr<UnaryRequestData<TRequest, TResponse>> m_unaryRequestData;
};

// All of the data associated with a single call. This data starts being
// owned by the MethodRegistration, but it transfered to the Dispatcher when
// the business logic needs to be run. This way, we can re-request another
// incoming call while still processing earlier calls.
template <typename TRequest, typename TResponse>
struct UnaryRequestData
{
  ServerContext ctx;
  TRequest request;
  ServerAsyncResponseWriter<TResponse> responder;

  UnaryRequestData()
  : ctx(), request(), responder(&ctx)
  {}
};

The aforementioned system where I implemented this is the Bond-over-gRPC
project [1]. It doesn't use Protocol Buffers for its IDL and it presents a
higher-level abstraction, so the interface to the generated code is
different. The fundamental method registration and lifetime management
concepts will be analogous, however. The code for the C++ gRPC stuff lives
in the cpp/inc/bonc/ext/grpc directory [2].

Hope this helps.

[1]: https://microsoft.github.io/bond/manual/bond_over_grpc.html#bond-over-grpc-for-c-1
[2]: https://github.com/microsoft/bond/tree/master/cpp/inc/bond/ext/grpc

--
Christopher Warrington
Microsoft Corp.

Joseph Liu

unread,
Dec 3, 2020, 10:58:06 AM12/3/20
to grpc.io
Thank you!
Reply all
Reply to author
Forward
0 new messages