Hi
Apologies as this is a bit long, I would like to know the best practise for shutting down a bidrectional straeming RPC server in C++, or what other people have done.
I have a server which starts a thread to process completion queue events. In my destructor I want to stop the thread.
Following the Async C++ example I call Shutdown() on the server and completion queue in my destructor and join the thread.
The server is processing bi-directional streaming RPC calls from clients. The code below is reduced to leave only the logic relating to my question. Due to grpc asserting in grcp_cq_begin_op if more work is enqueued while shutting down I introduced an atomic called "moribund" to indicate shutdown and avoid further work from being queued when shutting down. However it is not sufficient:
// this mostly works but asserts sometimes - see comment before ce->process() at the end of MyServer::server()
MyServer::~MyServer()
{
moribund.store(true, std::memory_order_relaxed);
server->Shutdown(); // must be done first
server_completion_queue->Shutdown();
if (runner.joinable())
runner.join();
}
void MyServer::serve() // thread runs this method
{
create_streaming_rpc(); // constructs first streaming RPC
void* tag;
bool ok;
while (server_completion_queue->Next(&tag, &ok))
{
CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
if (!ok)
{
// disconnection or termination, erase instance
delete ce;
}
else if (!moribund.load(std::memory_order_relaxed))
{
// this is not correct, destructor can set moribund and call Shutdown()
// functions at this exact point before we call ce->process()
// which might add to completion queue again resulting in grpc assert
ce->process();
}
}
}
I then reworked it to the following, removing the atomic and making it an automatic variable, and calling the completion queue shutdown in the serving thread to avoid the race I had above. It works however I cannot tell when an RPC client is disconnecting apart from when the server is shutting down.
MyServer::~MyServer()
{
server->Shutdown(); // must be done first
// there is always a streaming RPC created to accept the next client's connection and streaming RPCs,
// by calling server's Shutdown() that instance is presented by completion queue's Next() method with ok = false
if (runner.joinable())
runner.join();
}
void MyServer::serve() // thread runs this method
{
create_streaming_rpc(); // constructs first streaming RPC
bool moribund = false;
void* tag;
bool ok;
while (server_completion_queue->Next(&tag, &ok))
{
CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
if (!ok)
{
// disconnection or termination, erase instance
delete ce;
moribund = true; // really? could be client disconnection and not server shutdown
server_completion_queue->Shutdown();
}
else if (!moribund)
{
ce->process();
}
}
}
So finally question 1: How can I tell the difference between server shutdown and client disconnection from server_completion_queue->Next() when ok is false?
My final attempt is using (or rather abusing) an Alarm - seems to work but unfortunately fails to link (missing symbol 'grpc_shutdown' from libgrpc.so which is not in pkg-config for grpc++) unless libgrpc.so is also linked against. This however seems to work robustly (so far :-) ):
MyServer::~MyServer()
{
grpc::Alarm alarm;
// std::chrono::steady_clock::now() does not compile (no template instantiation for it on Set), also I am not sure how to use gpr_timespec
// NOTE nullptr tag
alarm.Set(server_completion_queue.get(), gpr_timespec{0, 0, GPR_TIMESPAN}, nullptr);
if (runner.joinable())
runner.join();
}
void MyServer::serve() // thread runs this method
{
create_streaming_rpc(); // constructs first streaming RPC
bool moribund = false;
void* tag;
bool ok;
while (server_completion_queue->Next(&tag, &ok))
{
if (!tag) // alarm fired
{
server->Shutdown();
server_completion_queue->Shutdown();
moribund = true;
continue;
}
CompletionEvent* ce = static_cast<CompletionEvent*>(tag);
if (!ok)
{
// disconnection or termination, erase instance
delete ce;
}
else if (!moribund)
{
ce->process();
}
}
}
Question 2: Does the gpr_timespec used on setting the Alarm refer to the epoch and thus the alarm will always fire?
Many thanks for any advice or sharing of ideas.