Chaining RPC calls not working as intended

16 views
Skip to first unread message

Joseph Liu

unread,
Nov 29, 2020, 2:02:25 PM11/29/20
to grpc.io
I have a gRPC service called `backend` with the following proto and I'm trying to chain together gRPC calls together. 

Let's say there are 3 servers: A, B and C. 

B is the primary and A and C are non-primaries.

The idea is that server A calls `ForwardAddUserToPrimary` to server B, which then calls `SendAddUserRequest` to A and C. Once B has received the responses from A and C, it then returns a response back to A.

**backend.proto:**

    syntax = "proto3";
    
    package backend;
    
    import "keyvaluestore.proto";
    
    service Backend {
      rpc ForwardAddUserToPrimary(keyvaluestore.Credentials) returns
      (keyvaluestore.Response) {}
      rpc SendAddUserRequest(keyvaluestore.Credentials) returns
      (keyvaluestore.Response) {}

How I'm currently doing it is by instantiating a client in the server implementation of `ForwardAddUserToPrimary` and then calling `client.SendAddUserRequest`:

     Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, const Credentials* cred, Response* res) {
      log("[S: ForwardAddUserToPrimary] Received by primary", VB);
      int acks = 1;
      for (string addr : cluster_addrs_) {
        if (addr == my_addr_) continue; // Skip myself
        log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendAddUserRequest(cred->user(),
                                                 cred->passwd());
        if (success) {
          acks++;
        }
      }
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };

This method works a tiny of the time. The other times it returns an error code 12 (UNIMPLEMENTED):

    13:17:38.689242 [S: ForwardAddUserToPrimary] Received by primary
    13:17:38.689298 [S: ForwardAddUserToPrimary] Propagating to 127.0.0.1:5000
    13:17:38.690605 [C: SendAddUserRequest] Error 12:

Since each node can be both a server and a client, every node runs the backend server in a separate thread:

**node.cc:**

    void RunBackendServer(const string& my_addr, const vector<string>& cluster_addrs) {
      BackendServiceImpl service(my_addr, cluster_addrs);
      ServerBuilder builder;
      builder.AddListeningPort(my_addr, grpc::InsecureServerCredentials());
      builder.RegisterService(&service);
      unique_ptr<Server> server(builder.BuildAndStart());
      cout << "Backend server listening on " << my_addr << endl;
      server->Wait();
    
      return;
    }
    
    int main(int argc, char** argv) {
      
      int serv_idx = atoi(argv[argc-1]);
      vector<string> cluster_addrs;
      read_config(cluster_addrs, string(argv[argc-2]));
      string my_addr = cluster_addrs[serv_idx];
    
      thread bethread(RunBackendServer, my_addr, cluster_addrs);
      
      // Server 0 forwards to the primary, which is server 1
      if (serv_idx = 0) {
        int dest_idx = (serv_idx + 1) % cluster_addrs.size();
        string dest_addr = cluster_addrs[dest_idx];
        BackendClient client(
            grpc::CreateChannel(dest_addr,
                                grpc::InsecureChannelCredentials()));
    
        bool success = client.ForwardAddUserToPrimary("user", "pass");
      } 
    
      bethread.join();
    
      return 0;
    }

In my test I only have 2 servers in my cluster. What I found weird was that it would work some of the time and wouldn't work for other times.

Does anyone know what I could be doing wrong here?

-------------------

**backend_client.cc**:

    BackendClient::BackendClient(shared_ptr<Channel> channel) : stub_(Backend::NewStub(channel)) {};
    
    bool BackendClient::ForwardAddUserToPrimary(const string& user, const string& passwd) {
      Credentials cred;
      cred.set_user(user);
      cred.set_passwd(passwd);
    
      ClientContext ctx;
      Response res;
      Status status = stub_->ForwardAddUserToPrimary(&ctx, cred, &res);
      if (status.ok()) {
        log("[C: ForwardAddUserToPrimary] Successfully replicated user: " + user + ", passwd: " + passwd, VB);
        return true;
      } else {
        log("[C: ForwardAddUserToPrimary] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
        return false;
      }
    };
    
    bool BackendClient::SendAddUserRequest(const string& user, const string& passwd) {
      Credentials cred;
      cred.set_user(user);
      cred.set_passwd(passwd);
    
      ClientContext ctx;
      Response res;
      Status status = stub_->SendAddUserRequest(&ctx, cred, &res);
      if (status.ok()) {
        log("[C: SendAddUserRequest] Request sent to add user: " + user + ", passwd: " + passwd, VB);
        return true;
      } else {
        log("[C: SendAddUserRequest] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
        return false;
      }
    };

**backend_server.cc**:

    BackendServiceImpl::BackendServiceImpl(const string& my_addr, const vector<string>& cluster_addrs) : my_addr_(my_addr), cluster_addrs_(cluster_addrs) {};
    
    Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, const Credentials* cred, Response* res) {
      log("[S: ForwardAddUserToPrimary] Received by primary", VB);
      int acks = 1;
      for (string addr : cluster_addrs_) {
        if (addr == my_addr_) continue; // Skip myself
        log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendAddUserRequest(cred->user(),
                                                 cred->passwd());
        if (success) {
          acks++;
        }
      }
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };
    
    Status BackendServiceImpl::SendAddUserRequest(ServerContext* ctx, const Credentials* cred, Response* res) {
      TABLET.insert({ cred->user(), unordered_map<string, char*>() });
      TABLET[cred->user()]["passwd"] = (char*) malloc(cred->passwd().size());
      memcpy(TABLET[cred->user()]["passwd"], cred->passwd().c_str(), cred->passwd().size()+1);
      TABLET[cred->user()]["passwd"] = (char*) malloc(cred->passwd().size());
      memcpy(TABLET[cred->user()]["passwd"], const_cast<char*>(cred->passwd().c_str()), cred->passwd().size()+1);
      log("[S: SendAddUserRequest] Added user: " + cred->user() + ", passwd: " + cred->passwd(), VB);
      return Status::OK;
    };
    
    Status BackendServiceImpl::ForwardPutToPrimary(ServerContext* ctx, const Request* req, Response* res) {
      log("[S: ForwardPutToPrimary] Received by primary", VB);
    
      int acks = 0;
      for (string addr : cluster_addrs_) {
        log("[S: ForwardPutToPrimary] Propagating to " + addr, VB);
        BackendClient client(
            grpc::CreateChannel(addr,
                                grpc::InsecureChannelCredentials()));
        bool success = client.SendPutRequest(req->user(),
                                                req->key(),
                                                const_cast<char*>(req->val().c_str()));
        if (success) {
          acks++;
        }
      }
    
      if (acks == cluster_addrs_.size()) {
        return Status::OK;
      }
      return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
    };



Reply all
Reply to author
Forward
0 new messages