This turns out to happen randomly. From my observation, it's less likely (1 out of 10 tries) hitting the "not expected" case when client is pinging localhost while the likelihood is higher when pinging a remote host.
1) What's the first event in the cq? From the tag address, it's the same AsyncClientCall object but shouldn't be only two events: 1) ok=0 (rpc failure) and 2) ok=1 as ack of Finish()?
2) Is the cq the middle man between server and client that all client-server communication messages are going to be registered in cq? Is there a race condition here between server's response and client's actions: send request, Read, Finish, etc?
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc++/grpc++.h>
#include "helloworld.grpc.pb.h"
using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientAsyncReaderInterface;
using grpc::CompletionQueue;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
class GreeterClient {
public:
GreeterClient() : email_alert(true)
{
std::cout << "ctor of GreeterClient" << std::endl;
// Customize backoff arguments.
int backoff_ms = 5000;
grpc::ChannelArguments ch_args;
ch_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, backoff_ms);
ch_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, backoff_ms);
ch_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, backoff_ms);
// Create a custom channel and attach it to the stub.
stub_ = Greeter::NewStub(grpc::CreateCustomChannel("localhost:50051", grpc::InsecureChannelCredentials(), ch_args));
// stub_ = Greeter::NewStub(grpc::CreateCustomChannel("REMOTE_HOST:50051", grpc::InsecureChannelCredentials(), ch_args));
// Spawn a thread listening to stream asynchronously.
thread_listener_ = std::thread(&GreeterClient::AsyncCompleteRpc, this);
}
~GreeterClient(void)
{
std::cout << "dtor of GreeterClient" << std::endl;
if (thread_listener_.joinable()) thread_listener_.join();
std::cout << "thread joined" << std::endl;
}
// Assembles the client's payload and sends it to the server.
void SayHello(void)
{
HelloRequest request;
// Data we are sending to the server.
request.set_name("World");
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
std::cout << "Created new AsyncClientCall at: " << call << std::endl;
call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, (void *)call);
std::cout << "## SAYING hello!!" << std::endl;
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc()
{
void* got_tag;
bool ok = false;
std::cout << "----@@ ASYNCComplete on separate thread!!" << std::endl;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok))
{
// The tag in this example is the memory location of the call object
AsyncClientCall* ac = static_cast<AsyncClientCall*>(got_tag);
std::cout << "Tag received: " << ac << ", Completion queue/Next() returned: " << ok << std::endl;
// Verify that the request was completed successfully. Note that "ok" corresponds solely to the request for updates introduced by Finish().
if (! ac->HandleResponse(ok, email_alert))
{
if (email_alert)
{
std::string cmd { "echo | mail -s \"Test HelloWorld service is down.\" EM...@XXX.com" };
system(cmd.c_str());
email_alert = false;
}
delete ac;
std::cout << "Reconnecting ..." << std::endl;
SayHello();
}
}
}
private:
bool email_alert;
std::thread thread_listener_;
// struct for keeping state and data information
struct AsyncClientCall
{
enum CallStatus {CREATE, PROCESS, FINISH};
CallStatus callStatus_;
Status status;
HelloReply reply;
ClientContext context;
std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> response_reader;
AsyncClientCall(): callStatus_(CREATE) {}
~AsyncClientCall()
{
std::cout << "*********dtor AsyncClientCall" << std::endl;
}
bool HandleResponse(bool responseStatus, bool& email_alert)
{
switch (callStatus_)
{
case CREATE:
if (responseStatus) {
std::cout << "HandleEachResponse_CREATE_GOOD" << std::endl;
response_reader->Read(&reply, (void*)this);
callStatus_ = PROCESS;
email_alert = true;
} else {
std::cout << "HandleEachResponse_CREATE_BAD" << std::endl;
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case PROCESS:
if (responseStatus) {
std::cout << "HandleEachResponse_PROCESS_GOOD" << std::endl;
std::cout << "PROCESS_GOOD: Greeter received: " << this << " : " << reply.message() << std::endl;
response_reader->Read(&reply, (void*)this);
} else {
std::cout << "HandleEachResponse_PROCESS_BAD" << std::endl;
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case FINISH:
if (status.ok()) {
std::cout << "HandleEachResponse_FINISH_GOOD" << std::endl;
std::cout << "Server Response Completed: " << this << " CallData: " << this << std::endl;
}
else {
std::cout << "HandleEachResponse_FINISH_BAD" << std::endl;
std::cout << "RPC failed" << std::endl;
return false;
}
break;
default:
break;
}
return true;
}
};
std::unique_ptr<Greeter::Stub> stub_;
CompletionQueue cq_;
};
int main(int argc, char** argv) {
GreeterClient greeter;
greeter.SayHello();
std::cout << "Press control-c to quit" << std::endl << std::endl;
return 0;
}