Advice on Async C++ streaming server shutdown

574 views
Skip to first unread message

pret...@googlemail.com

unread,
May 31, 2018, 5:18:41 PM5/31/18
to grpc.io

Hi

Apologies as this is a bit long, I would like to know the best practise for shutting down a bidrectional straeming RPC server in C++, or what other people have done.
I have a server which starts a thread to process completion queue events. In my destructor I want to stop the thread.
Following the Async C++ example I call Shutdown() on the server and completion queue in my destructor and join the thread.
The server is processing bi-directional streaming RPC calls from clients. The code below is reduced to leave only the logic relating to my question. Due to grpc asserting in grcp_cq_begin_op if more work is enqueued while shutting down I introduced an atomic called "moribund" to indicate shutdown and avoid further work from being queued when shutting down. However it is not sufficient:

// this mostly works but asserts sometimes  - see comment before ce->process() at the end of MyServer::server()
MyServer::~MyServer()
{
    moribund.store(true, std::memory_order_relaxed);
    server->Shutdown(); // must be done first
    server_completion_queue->Shutdown();
    if (runner.joinable())
        runner.join();
}

void MyServer::serve()  // thread runs this method
{
    create_streaming_rpc(); // constructs first streaming RPC
    void* tag;
    bool ok;
    while (server_completion_queue->Next(&tag, &ok))
    {
        CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
        if (!ok)
        {
            // disconnection or termination, erase instance
            delete ce;
        }
        else if (!moribund.load(std::memory_order_relaxed))
        {
            // this is not correct, destructor can set moribund and call Shutdown()
            // functions at this exact point before we call ce->process()
            // which might add to completion queue again resulting in grpc assert
            ce->process();
        }
    }
}


I then reworked it to the following, removing the atomic and making it an automatic variable, and calling the completion queue shutdown in the serving thread to avoid the race I had above. It works however I cannot tell when an RPC client is disconnecting apart from when the server is shutting down.

MyServer::~MyServer()
{
    server->Shutdown(); // must be done first
    // there is always a streaming RPC created to accept the next client's connection and streaming RPCs,
    // by calling server's Shutdown() that instance is presented by completion queue's Next() method with ok = false
    if (runner.joinable())
        runner.join();
}

void MyServer::serve()  // thread runs this method
{
    create_streaming_rpc(); // constructs first streaming RPC
    bool moribund = false;
    void* tag;
    bool ok;
    while (server_completion_queue->Next(&tag, &ok))
    {
        CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
        if (!ok)
        {
            // disconnection or termination, erase instance
            delete ce;
            moribund = true; // really? could be client disconnection and not server shutdown
            server_completion_queue->Shutdown();
        }
        else if (!moribund)
        {
            ce->process();
        }
    }
}

So finally question 1: How can I tell the difference between server shutdown and client disconnection from server_completion_queue->Next() when ok is false?
Reading https://github.com/grpc/grpc/issues/12300 I can only guess that if I compare the "ce" address with that of the last streaming RPC added/created then I can assume it is Shutdown when ok is false.


My final attempt is using (or rather abusing) an Alarm - seems to work but unfortunately fails to link (missing symbol 'grpc_shutdown' from libgrpc.so which is not in pkg-config for grpc++) unless libgrpc.so is also linked against. This however seems to work robustly (so far :-) ):

MyServer::~MyServer()
{
    grpc::Alarm alarm;
    // std::chrono::steady_clock::now() does not compile (no template instantiation for it on Set), also I am not sure how to use gpr_timespec
    // NOTE nullptr tag
    alarm.Set(server_completion_queue.get(), gpr_timespec{0, 0, GPR_TIMESPAN}, nullptr);
    if (runner.joinable())
        runner.join();
}

void MyServer::serve()  // thread runs this method
{
    create_streaming_rpc(); // constructs first streaming RPC
    bool moribund = false;
    void* tag;
    bool ok;
    while (server_completion_queue->Next(&tag, &ok))
    {
        if (!tag) // alarm fired
        {
            server->Shutdown();
            server_completion_queue->Shutdown();
            moribund = true;
            continue;
        }

        CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
        if (!ok)
        {
            // disconnection or termination, erase instance
            delete ce;
        }
        else if (!moribund)
        {
            ce->process();
        }
    }
}

I got the idea of using Alarm from https://github.com/grpc/grpc/issues/8442
Question 2: Does the gpr_timespec used on setting the Alarm refer to the epoch and thus the alarm will always fire?

Many thanks for any advice or sharing of ideas.

Reply all
Reply to author
Forward
0 new messages