Random completion queue "ok" value when an asynchronous streaming RPC client pinging a dead service

42 views
Skip to first unread message

peng...@gmail.com

unread,
Feb 7, 2018, 4:16:53 PM2/7/18
to grpc.io

I started learning grpc very recently, and followed the docs/instructions and discussions online to build a toy program of an asynchronous streaming RPC client (.proto and client.cc are attached below). My grpc version is 1.8.4.

My main idea for this client is to create a "listener" in a separate thread receiving the streaming response from server, so once a request is sent, my later-extended program will proceed in the main thread without being blocked. If server is not accessible (either when the client starts or the connection drops in the middle), an alert email will be sent. I don't want this is over alerting but just that each time the RPC connection drops. Then client will try to reconnect every 5 seconds.

Now, I have a "state machine" in HandleResponse() to handle the events. As a basic test, even without a server, I should be able to start my client, got RPC failure, send email and try to reconnect ...
The following output (Expected) confirms the expectation. And if my understanding is correct, the first cq returned ok being 0 is because the RPC failure event in the cq; while the following cq returned ok being 1 is to acknowledge my Finish() request fired in "case Finish:" block in HandleResponse(). Correct?

However, sometime the output (Not Expected) has additional event at the beginning (lines highlighted in light red). Cq returned ok is 1, which makes the state machine consider it's "connected" and turn on the email alert. When this happens, multiple emails will be sent which is not what I expect (it should only send the second email when it connects then drops).

This turns out to happen randomly. From my observation, it's less likely (1 out of 10 tries) hitting the "not expected" case when client is pinging localhost while the likelihood is higher when pinging a remote host.

My questions:
1) What's the first event in the cq? From the tag address, it's the same AsyncClientCall object but shouldn't be only two events: 1) ok=0 (rpc failure) and 2) ok=1 as ack of Finish()?
2) Is the cq the middle man between server and client that all client-server communication messages are going to be registered in cq? Is there a race condition here between server's response and client's actions: send request, Read, Finish, etc?

Thanks in advance!


Expected
$ ./hw_cli
ctor of GreeterClient
----@@ ASYNCComplete on separate thread!!
Created new AsyncClientCall at: 0x1774440
## SAYING hello!!
Press control-c to quit

dtor of GreeterClient
Tag received: 0x1774440, Completion queue/Next() returned: 0
HandleEachResponse_CREATE_BAD
Tag received: 0x1774440, Completion queue/Next() returned: 1
HandleEachResponse_FINISH_BAD
RPC failed
*********dtor AsyncClientCall
Reconnecting ...

Not Expected
./hw_cli
ctor of GreeterClient
----@@ ASYNCComplete on separate thread!!
Created new AsyncClientCall at: 0x2419440
## SAYING hello!!
Press control-c to quit

dtor of GreeterClient
Tag received: 0x2419440, Completion queue/Next() returned: 1
HandleEachResponse_CREATE_GOOD
Tag received: 0x2419440, Completion queue/Next() returned: 0
HandleEachResponse_PROCESS_BAD
Tag received: 0x2419440, Completion queue/Next() returned: 1
HandleEachResponse_FINISH_BAD
RPC failed
*********dtor AsyncClientCall
Reconnecting ...


helloworld.proto

syntax = "proto3";
package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

client.cc
#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include <grpc++/grpc++.h>
#include "helloworld.grpc.pb.h"

using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientAsyncReaderInterface;
using grpc::CompletionQueue;

using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;

class GreeterClient {
public:
        GreeterClient() : email_alert(true)
{
std::cout << "ctor of GreeterClient" << std::endl;

// Customize backoff arguments.
int backoff_ms = 5000;
grpc::ChannelArguments ch_args;
ch_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, backoff_ms);
ch_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,     backoff_ms);
ch_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,     backoff_ms);

// Create a custom channel and attach it to the stub.
stub_ = Greeter::NewStub(grpc::CreateCustomChannel("localhost:50051", grpc::InsecureChannelCredentials(), ch_args));
// stub_ = Greeter::NewStub(grpc::CreateCustomChannel("REMOTE_HOST:50051", grpc::InsecureChannelCredentials(), ch_args));

// Spawn a thread listening to stream asynchronously.
            thread_listener_ = std::thread(&GreeterClient::AsyncCompleteRpc, this);
}

~GreeterClient(void)
{
std::cout << "dtor of GreeterClient" << std::endl;
if (thread_listener_.joinable()) thread_listener_.join();
std::cout << "thread joined" << std::endl;
}

        // Assembles the client's payload and sends it to the server.
        void SayHello(void) 
{
            HelloRequest request;
            // Data we are sending to the server.
            request.set_name("World");

            // Call object to store rpc data
            AsyncClientCall* call = new AsyncClientCall;
    std::cout << "Created new AsyncClientCall at: " << call << std::endl; 

            call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, (void *)call);
    std::cout << "## SAYING hello!!" << std::endl;
        }

        // Loop while listening for completed responses.
        // Prints out the response from the server.
        void AsyncCompleteRpc() 
{
            void* got_tag;
            bool ok = false;
    std::cout << "----@@ ASYNCComplete on separate thread!!" << std::endl;

            // Block until the next result is available in the completion queue "cq".
            while (cq_.Next(&got_tag, &ok)) 
    {
                // The tag in this example is the memory location of the call object
                AsyncClientCall* ac = static_cast<AsyncClientCall*>(got_tag);
                std::cout << "Tag received: " << ac << ", Completion queue/Next() returned: " << ok << std::endl;

                // Verify that the request was completed successfully. Note that "ok" corresponds solely to the request for updates introduced by Finish().
                if (! ac->HandleResponse(ok, email_alert))
{
    if (email_alert)
    {
std::string cmd { "echo | mail -s \"Test HelloWorld service is down.\" EM...@XXX.com" };
system(cmd.c_str());
email_alert = false;
    }
                    delete ac;
    std::cout << "Reconnecting ..." << std::endl;
         SayHello();
}
            }
        }

private:
bool email_alert;
std::thread thread_listener_;

    // struct for keeping state and data information
        struct AsyncClientCall
{
            enum CallStatus {CREATE, PROCESS, FINISH};
            CallStatus callStatus_;

            Status status;
            HelloReply reply;
            ClientContext context;
            std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> response_reader;

            AsyncClientCall(): callStatus_(CREATE) {}

            ~AsyncClientCall() 
    {
std::cout << "*********dtor AsyncClientCall" << std::endl;
    }

            bool HandleResponse(bool responseStatus, bool& email_alert) 
         {
                switch (callStatus_) 
{
                case CREATE:
                    if (responseStatus) {
std::cout << "HandleEachResponse_CREATE_GOOD" << std::endl;
                        response_reader->Read(&reply, (void*)this);
                        callStatus_ = PROCESS;
email_alert = true;
                    } else {
std::cout << "HandleEachResponse_CREATE_BAD" << std::endl;
                        response_reader->Finish(&status, (void*)this);
                        callStatus_ = FINISH;
                    }
                    break;
                case PROCESS:
                    if (responseStatus) {
std::cout << "HandleEachResponse_PROCESS_GOOD" << std::endl;
                        std::cout << "PROCESS_GOOD: Greeter received: " << this << " : " << reply.message() << std::endl;
                        response_reader->Read(&reply, (void*)this);
                    } else {
        std::cout << "HandleEachResponse_PROCESS_BAD" << std::endl;
                        response_reader->Finish(&status, (void*)this);
                        callStatus_ = FINISH;
                    }
                    break;
                case FINISH:
                    if (status.ok()) {
  std::cout << "HandleEachResponse_FINISH_GOOD" << std::endl;
                        std::cout << "Server Response Completed: " << this << " CallData: " << this << std::endl;
                    }
                    else {
std::cout << "HandleEachResponse_FINISH_BAD" << std::endl;
                        std::cout << "RPC failed" << std::endl;
return false;
                    }
                    break;
default:
    break;
                }
    return true;
        }
    };

    std::unique_ptr<Greeter::Stub> stub_;

    CompletionQueue cq_;
};

int main(int argc, char** argv) {
    GreeterClient greeter;
greeter.SayHello();

    std::cout << "Press control-c to quit" << std::endl << std::endl;

    return 0;
}

Vijay Pai

unread,
Feb 7, 2018, 4:26:21 PM2/7/18
to peng...@gmail.com, grpc.io
Hi there,

Finish will always give an ok of 1 since it's always ok to be done with your RPC and check its status.

When starting an RPC, ok will give 0 only if gRPC is sure that that RPC will never actually work (see the comments for more detail): the ok of 1 indicates that gRPC is going to send this RPC initiation out on the wire. It would be 0 if the channel was permanently broken or the fail-fast option is set; the call could still end up failing even if the ok were 1 at the initiation time.

Hope that helps!

- Vijay


--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Yanpeng Wu

unread,
Feb 7, 2018, 6:20:34 PM2/7/18
to grpc.io
Hi Vijay,

Thank you very much for the reply!

1. "Finish will always give an ok of 1", I think this is clear and makes perfect sense. This is corresponding to the last "ok" print in my log:
Tag received: 0x2419440, Completion queue/Next() returned: 1
HandleEachResponse_FINISH_BAD

2. I read that "ok" document for "Client-Side StartCall/RPC invocation", but I am still confused by such uncertain outcomes. Given the same setup, how does gRPC decide it's never gonna work and simply return ok=0, or send this RPC initiation out on the wire (return ok=1 first) followed by a failure (then ok=0)? From my observation, this decision (if there is any) looks being made randomly with higher weights on the first choice. Is this the design or there is a cause physically such as network fluctuation?

Thanks again!
Reply all
Reply to author
Forward
0 new messages