I have a gRPC service called `backend` with the following proto and I'm trying to chain together gRPC calls together.
Let's say there are 3 servers: A, B and C.
B is the primary and A and C are non-primaries.
The idea is that server A calls `ForwardAddUserToPrimary` to server B, which then calls `SendAddUserRequest` to A and C. Once B has received the responses from A and C, it then returns a response back to A.
**backend.proto:**
syntax = "proto3";
package backend;
import "keyvaluestore.proto";
service Backend {
rpc ForwardAddUserToPrimary(keyvaluestore.Credentials) returns
(keyvaluestore.Response) {}
rpc SendAddUserRequest(keyvaluestore.Credentials) returns
(keyvaluestore.Response) {}
How I'm currently doing it is by instantiating a client in the server implementation of `ForwardAddUserToPrimary` and then calling `client.SendAddUserRequest`:
Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, const Credentials* cred, Response* res) {
log("[S: ForwardAddUserToPrimary] Received by primary", VB);
int acks = 1;
for (string addr : cluster_addrs_) {
if (addr == my_addr_) continue; // Skip myself
log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
BackendClient client(
grpc::CreateChannel(addr,
grpc::InsecureChannelCredentials()));
bool success = client.SendAddUserRequest(cred->user(),
cred->passwd());
if (success) {
acks++;
}
}
if (acks == cluster_addrs_.size()) {
return Status::OK;
}
return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
};
This method works a tiny of the time. The other times it returns an error code 12 (UNIMPLEMENTED):
13:17:38.689242 [S: ForwardAddUserToPrimary] Received by primary
13:17:38.689298 [S: ForwardAddUserToPrimary] Propagating to
127.0.0.1:5000 13:17:38.690605 [C: SendAddUserRequest] Error 12:
Since each node can be both a server and a client, every node runs the backend server in a separate thread:
**node.cc:**
void RunBackendServer(const string& my_addr, const vector<string>& cluster_addrs) {
BackendServiceImpl service(my_addr, cluster_addrs);
ServerBuilder builder;
builder.AddListeningPort(my_addr, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
unique_ptr<Server> server(builder.BuildAndStart());
cout << "Backend server listening on " << my_addr << endl;
server->Wait();
return;
}
int main(int argc, char** argv) {
int serv_idx = atoi(argv[argc-1]);
vector<string> cluster_addrs;
read_config(cluster_addrs, string(argv[argc-2]));
string my_addr = cluster_addrs[serv_idx];
thread bethread(RunBackendServer, my_addr, cluster_addrs);
// Server 0 forwards to the primary, which is server 1
if (serv_idx = 0) {
int dest_idx = (serv_idx + 1) % cluster_addrs.size();
string dest_addr = cluster_addrs[dest_idx];
BackendClient client(
grpc::CreateChannel(dest_addr,
grpc::InsecureChannelCredentials()));
bool success = client.ForwardAddUserToPrimary("user", "pass");
}
bethread.join();
return 0;
}
In my test I only have 2 servers in my cluster. What I found weird was that it would work some of the time and wouldn't work for other times.
Does anyone know what I could be doing wrong here?
-------------------
**backend_client.cc**:
BackendClient::BackendClient(shared_ptr<Channel> channel) : stub_(Backend::NewStub(channel)) {};
bool BackendClient::ForwardAddUserToPrimary(const string& user, const string& passwd) {
Credentials cred;
cred.set_user(user);
cred.set_passwd(passwd);
ClientContext ctx;
Response res;
Status status = stub_->ForwardAddUserToPrimary(&ctx, cred, &res);
if (status.ok()) {
log("[C: ForwardAddUserToPrimary] Successfully replicated user: " + user + ", passwd: " + passwd, VB);
return true;
} else {
log("[C: ForwardAddUserToPrimary] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
return false;
}
};
bool BackendClient::SendAddUserRequest(const string& user, const string& passwd) {
Credentials cred;
cred.set_user(user);
cred.set_passwd(passwd);
ClientContext ctx;
Response res;
Status status = stub_->SendAddUserRequest(&ctx, cred, &res);
if (status.ok()) {
log("[C: SendAddUserRequest] Request sent to add user: " + user + ", passwd: " + passwd, VB);
return true;
} else {
log("[C: SendAddUserRequest] Error " + to_string(status.error_code()) + ": " + status.error_message(), VB);
return false;
}
};
**backend_server.cc**:
BackendServiceImpl::BackendServiceImpl(const string& my_addr, const vector<string>& cluster_addrs) : my_addr_(my_addr), cluster_addrs_(cluster_addrs) {};
Status BackendServiceImpl::ForwardAddUserToPrimary(ServerContext* ctx, const Credentials* cred, Response* res) {
log("[S: ForwardAddUserToPrimary] Received by primary", VB);
int acks = 1;
for (string addr : cluster_addrs_) {
if (addr == my_addr_) continue; // Skip myself
log("[S: ForwardAddUserToPrimary] Propagating to " + addr, VB);
BackendClient client(
grpc::CreateChannel(addr,
grpc::InsecureChannelCredentials()));
bool success = client.SendAddUserRequest(cred->user(),
cred->passwd());
if (success) {
acks++;
}
}
if (acks == cluster_addrs_.size()) {
return Status::OK;
}
return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
};
Status BackendServiceImpl::SendAddUserRequest(ServerContext* ctx, const Credentials* cred, Response* res) {
TABLET.insert({ cred->user(), unordered_map<string, char*>() });
TABLET[cred->user()]["passwd"] = (char*) malloc(cred->passwd().size());
memcpy(TABLET[cred->user()]["passwd"], cred->passwd().c_str(), cred->passwd().size()+1);
TABLET[cred->user()]["passwd"] = (char*) malloc(cred->passwd().size());
memcpy(TABLET[cred->user()]["passwd"], const_cast<char*>(cred->passwd().c_str()), cred->passwd().size()+1);
log("[S: SendAddUserRequest] Added user: " + cred->user() + ", passwd: " + cred->passwd(), VB);
return Status::OK;
};
Status BackendServiceImpl::ForwardPutToPrimary(ServerContext* ctx, const Request* req, Response* res) {
log("[S: ForwardPutToPrimary] Received by primary", VB);
int acks = 0;
for (string addr : cluster_addrs_) {
log("[S: ForwardPutToPrimary] Propagating to " + addr, VB);
BackendClient client(
grpc::CreateChannel(addr,
grpc::InsecureChannelCredentials()));
bool success = client.SendPutRequest(req->user(),
req->key(),
const_cast<char*>(req->val().c_str()));
if (success) {
acks++;
}
}
if (acks == cluster_addrs_.size()) {
return Status::OK;
}
return Status(StatusCode::FAILED_PRECONDITION, "Not enough votes");
};