Help implementing a grpc "Conversation" (C++)

44 views
Skip to first unread message

michaell...@gmail.com

unread,
Sep 19, 2018, 1:44:22 PM9/19/18
to grpc.io
I've been trying (and struggling) to create a type of discussion between the Server and Client, where the Client sends the initial request, and in some cases, the Server will require additional information from the Client, before it can complete the overall request. 

Something like...
Client: Hello
Server: Name?
Client: John
Server: Age?
Client: 36
Server: Here's the collated data.

Currently, I'm able to get the Server to respond initially (Name?), but on the subsequent response, it aborts abruptly (Age?). The idea here is that occasionally the server may need more information from the client, which we cannot send across in the original message. Furthermore, while the on the client side is hardcoded, that's not the actual case in the implementation. I'm just trying to simplify it here. 

Any help is definitely appreciated, and let me know if anything is not clear. 

Code below...

PROTO
service HelloConversationSupplierService {
rpc SayHello(stream HelloRequest) returns (stream HelloResponse) {}
}

message HelloRequest {
  MethodName Name = 1;
  string UserName = 2;
  int Age = 3;
}

message HelloResponse {
 MethodName Name = 1;
 PersonData Person = 2;
}

message Person {
 string UserName = 1;
 int Age = 2;
}


enum MethodName {
 Start = 0; 
 GetName = 1;
 GetAge = 2;
 Complete = 3;


CLIENT

PersonRef SayHello()
{
      HelloRequest request;
      request.set_name(MethodName::Start); // only field we set because we don't have any other data 
      std::shared_ptr<grpc::ClientReaderWriter<HelloRequest, HelloResponse>> stream(_sayHelloStub->SayHello(&context));  //_sayHelloStub is created and initalized already at this point
      stream->Write(request);

      while (stream->Read(&response))
      {
        switch (response.name())
        {

          default:
          case MethodName::Complete:
          {
            stream->WritesDone();
            grpc::Status status = stream->Finish();

            if(!status.ok())
              OutputDebugString("ERROR: SayHello rpc Failed");

            PersonRef person;
            person.Name = response.person().username();
            person.Age = response.person().age();

            return person;
          }

          case MethodName::GetName:
          {
            HelloRequest getName;
            getName.set_name(MethodName::GetName);
            getName.set_username("Bill");

            stream->Write(getName);

            break;
          }

          case MethodName::GetAge:
          {
            HelloRequest getAge;
            getAge.set_name(MethodName::GetAge);
            getAge.set_age("36");

            stream->Write(getAge);

            break;
          }
     }
}


SERVER
class ServerImpl final {
 public:
  ~ServerImpl() {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  // There is no shutdown handling in this code.
  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService(&service_);
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRpcs();
  }

 private:
  // Class encompasing the state and logic needed to serve a request.
  class CallData : ICallback
 {
   public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(HelloConversationSupplierService::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), stream_(&ctx_), status_(CREATE) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed(ModelManagerPartBrepCreationEngineImpl* brepCreationEngine)
    {
      switch (status_)
      {
        case START:
        {
          service_->RequestSayHello(&ctx_, &stream_, cq_, cq_, this);
          status_ = PROCESS;
          break;
        }
        case PROCESS:
        {
          new CallData(service_, cq_);

          stream_.Read(&request_, this);
          status_ = READ;
          break;
        }
        case READ:
        {
          if(request_.name() == MethodName::Start)
            Start(&response, this);

          stream_.Write(_response, this);

          status_ = WRITE;
          break;
        }
        case WRITE:
        {

          stream_.Finish(Status::OK, this);
          status_ = FINISH;
          break;
        }
        case FINISH:
        {
          delete this;
        }
      }
    }

   void Start(SayHelloResponse response, CallData* calldata)
{
    PersonRef person = engine_->Start(calldata); // I know "engine_ isn't defined elsewhere, but for simplicity, it is a separate process which handles all the heavy work. GRPC is just used to talk back and forth. CallData implements the callbacks the engine may use. (Occasionally, engine won't need anymore data and will return a fully defined PersonRef on its own) 
    response.set_username(person.userName);
    response.set_age(person.Age);
    response.set_name(MethodName::Complete);
}

   private:
    HelloConversationSupplierService::AsyncService* service_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;

    // What we get from the client.
    HelloRequest request_;
    // What we send back to the client.
    HelloReply reply_;

    // The means to get back to the client.
    ServerAsyncReaderWriter<HelloRequest,HelloReply> stream_;

    // Let's implement a tiny state machine with the following states.
    enum CallStatus { START, PROCESS, READ, WRITE, FINISH };
    CallStatus status_;  // The current serving state.


  // ICallback Methods. These are callback methods which will be called from the Server-side engine. 
   CallData::Name(string* name) 
   {
      _response.Clear();
      _response.set_name(MethodName::GetName); // here, we're telling the client that we need to get the user's name
      _stream.Write(_response, this);

      void* tag;
      bool ok;
      while (true)
      {
        SayHelloRequest request;
        _cq->Next(&tag, &ok);
        _stream.Read(&request, this);
 
        if(request.name() == MethodName::GetName)
 name = request.user_name(); // I can successfully get the hardcoded "BILL" from the client here
break;
}
       } 
  }

  CallData::Age(int* age)
   {
      _response.Clear();
      _response.set_name(MethodName::GetAge);
      _stream.Write(_response, this);

      void* tag;
      bool ok;
      while (true)
      {
        SayHelloRequest request;
        _cq->Next(&tag, &ok);
        _stream.Read(&request, this);  // THIS IS WHERE IT ABORTS 
 
        if(request.name() == MethodName::GetAge)
 name = request.age();
break;
}
       } 
  }
  };

  // This can be run in multiple threads if needed.
  void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return value
      // tells us whether there is any kind of event or cq_ is shutting down.
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  HelloConversationSupplierService::AsyncService service_;
  std::unique_ptr<Server> server_;
};

Reply all
Reply to author
Forward
0 new messages