Getting Assert failure for Completion Queue Shutdown in grpc C++

677 views
Skip to first unread message

sathya M

unread,
Jun 18, 2020, 2:11:22 AM6/18/20
to grpc.io

What did you do?

I have modified example(examples/cpp/helloworld/greeter_async_server.cc) given under grpc source code path, in order to understand how Server and CQ shutdown happens. And to know whether CQ is properly drained once shutdown is triggered.
Additional Changes Done
++++++++++++++++++

  1. Create server handle using new. ServerImpl *server = new ServerImpl;
  2. Created separate std::thread from main thread which will wait in Next for grpc event.
    std::thread thread_ = std::thread(&ServerImpl::Run, server);
  3. After few seconds(30 seconds here for testing purpose), delete server handle
    delete server;
    Steps to Reproduce
    +++++++++++++
    Just run /greeter_async_server with the diff I have added below.

What did you expect to see?

Completion Queue should be fully drained with out any failure, and then Process must exit smoothly.
Workaround that i made to see expected result is, I have added 2 secs delay between Server Shutdown and CQ shutdown, where i can see CQ is completely drained and Process has been exited normally.
So what i am doing wrong here? so what is the correct steps for graceful shutdown ?

What did you see instead?

Getting Assert failure, i.e. Receiving SIGABRT,
./greeter_async_server
Server listening on 0.0.0.0:50051
shutdown server
E0617 19:35:23.920990861 30581 completion_queue.cc:247] assertion failed: queue.num_items() == 0
Aborted (core dumped)
root@nvme-rdc:~/sathya-207-mdev/nz-svc/third-party/grpc_src/examples/cpp/helloworld#

Provided backtrace as well
++++++++++++++++++
Thread 2 "greeter_async_s" received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff6d34700 (LWP 30387)]
0x00007ffff6d6a428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
54 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007ffff6d6a428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54
#1 0x00007ffff6d6c02a in __GI_abort () at abort.c:89
#2 0x000000000059fc13 in cq_destroy_next(void*) ()
#3 0x00000000005a008f in grpc_cq_internal_unref(grpc_completion_queue*) ()
#4 0x0000000000579131 in grpc_core::ExecCtx::Flush() ()
#5 0x00000000005a10d0 in cq_next(grpc_completion_queue*, gpr_timespec, void*) ()
#6 0x00000000005a15eb in grpc_completion_queue_next ()
#7 0x0000000000508cf8 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#8 0x000000000042490d in grpc_impl::CompletionQueue::Next(void**, bool*) ()
#9 0x0000000000425730 in ServerImpl::HandleRpcs() ()
#10 0x0000000000425261 in ServerImpl::Run() ()
#11 0x0000000000429e11 in void std::_Mem_fn_base<void (ServerImpl::)(), true>::operator()<, void>(ServerImpl) const ()
#12 0x0000000000429d85 in void std::_Bind_simple<std::_Mem_fn<void (ServerImpl::)()> (ServerImpl)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) ()
#13 0x0000000000429a5e in std::_Bind_simple<std::_Mem_fn<void (ServerImpl::)()> (ServerImpl)>::operator()() ()
#14 0x0000000000429316 in std:🧵:_Impl<std::_Bind_simple<std::_Mem_fn<void (ServerImpl::)()> (ServerImpl)> >::_M_run() ()
#15 0x00007ffff76d6c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#16 0x00007ffff7bc16ba in start_thread (arg=0x7ffff6d34700) at pthread_create.c:333
#17 0x00007ffff6e3c41d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb) info threads
Id Target Id Frame
1 Thread 0x7ffff7fdf7c0 (LWP 30383) "greeter_async_s" 0x00007ffff7bc298d in pthread_join (threadid=140737334429440, thread_return=0x0)
at pthread_join.c:90

  • 2 Thread 0x7ffff6d34700 (LWP 30387) "greeter_async_s" 0x00007ffff6d6a428 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:54

 am copying my entire code here for your information which is nothing but greeter_async_server.cc with few modifications

#include
#include
#include
#include
#include <unistd.h>

#include <grpcpp/grpcpp.h>
#include <grpc/support/log.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;

class ServerImpl final {
public:
~ServerImpl() {
std::cout << "shutdown server" << std::endl;
// Always shutdown the completion queue after the server.
server_->Shutdown();
cq_->Shutdown();
}

// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");

ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;

// Proceed to the server's main loop.
HandleRpcs();

}

private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}

void Proceed() {
  if (status_ == CREATE) {
    // Make this instance progress to the PROCESS state.
    status_ = PROCESS;

    // As part of the initial CREATE state, we *request* that the system
    // start processing SayHello requests. In this request, "this" acts are
    // the tag uniquely identifying the request (so that different CallData
    // instances can serve different requests concurrently), in this case
    // the memory address of this CallData instance.
    service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                              this);
  } else if (status_ == PROCESS) {
    // Spawn a new CallData instance to serve new clients while we process
    // the one for this CallData. The instance will deallocate itself as
    // part of its FINISH state.
    new CallData(service_, cq_);

    // The actual processing.
    std::string prefix("Hello ");
    reply_.set_message(prefix + request_.name());

    // And we are done! Let the gRPC runtime know we've finished, using the
    // memory address of this instance as the uniquely identifying tag for
    // the event.
    status_ = FINISH;
    responder_.Finish(reply_, Status::OK, this);
  } else {
    GPR_ASSERT(status_ == FINISH);
    // Once in the FINISH state, deallocate ourselves (CallData).
    delete this;
  }
}

private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;

// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncResponseWriter<HelloReply> responder_;

// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;  // The current serving state.

};

// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
if(cq_->Next(&tag, &ok)) {
if (ok == false) {
std::cout << "Next returned ok as false" << std::endl;
delete ((CallData*)tag);
} else {
std::cout << "Next returned ok as true" << std::endl;
static_cast<CallData*>(tag)->Proceed();
}
continue;
}
std::cout << "Queue is drained" << std::endl;
break;
}
}

std::unique_ptr cq_;
Greeter::AsyncService service_;
std::unique_ptr server_;
};
int main(int argc, char** argv) {
ServerImpl *server = new ServerImpl;
std::thread thread_ = std::thread(&ServerImpl::Run, server);

/* Added wait so that server can serve async client meanwhile and
go for shutdown aftee mentioned delay*/
sleep(30);
delete server;
thread_.join();
return 0;
}

Christopher Warrington - MSFT

unread,
Jun 19, 2020, 6:25:01 PM6/19/20
to grpc.io
On Wednesday, June 17, 2020 at 11:11:22 PM UTC-7, sathya M wrote:

> so what is the correct steps for graceful shutdown ?

Take a look at the doc comments for ServerBuilder's AddCompletionQueue
member function [1]. It describes the canonical way to shutdown an async
server.

The assert is because you're trying to destroy a non-empty completion queue.
Completion queues need to be drained before they can be destroyed. The
aforementioned documentation shows a way to do that.

For even more details, read the various documentation comments for all the
types involved. They're good at detailing what owns what and what order
shutdown needs to happen in.

[1]: https://github.com/grpc/grpc/blob/66b045636018505dfa2654e62003d9596733076b/include/grpcpp/server_builder_impl.h#L140-L171

--
Christopher Warrington
Microsoft Corp.

sathya M

unread,
Jun 19, 2020, 9:25:28 PM6/19/20
to Christopher Warrington - MSFT, grpc.io
Thanks for your response.
As you mentioned, I have done a shutdown as per the documents comment given. As per the document given, Completion Queue will get drained only after the CQ shutdown.
Moreover , I havent done anything new, I have used the greeter_async_server example, and done few modifications that too not in Shutdown process,modifications are done as mentioned below, 
    1. Create server handle using new. ServerImpl *server = new ServerImpl;
    2. Created separate std::thread from main thread which will wait in Next for grpc event.
      std::thread thread_ = std::thread(&ServerImpl::Run, server);
    1. After few seconds, delete server handle
    I have pastee entire example code itself in mail thread, Wondering part is, in grpc given example itself I am getting Assert Failure, what might be the reason ? what should be done inorder to avoid this kind of failure? It looks like grpc has some issue in shutting down CQ.

    Can anyone in this group got similar failure? I am just testing the grpc given example code only not my own.

    --
    You received this message because you are subscribed to the Google Groups "grpc.io" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
    To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/2a6041a9-4a03-4c40-a147-bc7d1f4d52aco%40googlegroups.com.

    Christopher Warrington - MSFT

    unread,
    Jun 19, 2020, 9:55:49 PM6/19/20
    to grpc.io
    On Friday, June 19, 2020 at 6:25:28 PM UTC-7, sathya M wrote:

    > As you mentioned, I have done a shutdown as per the documents comment
    > given. As per the document given, Completion Queue will get drained only
    > after the CQ shutdown.

    That's not _quite_ what the documentation says. It says that after invoking
    CompletionQueue::Shutdown() that _something else_ *must* drain the CQ before
    the CQ is destroyed.

    Importantly, it does not say that the CQ will be drained.


    > what should be done inorder to avoid this kind of failure? It looks like
    > grpc has some issue in shutting down CQ.

    You'll need to make sure that you've actually drained the completion queue
    before deleting it. In the code you posted, I see a possible sequence of
    events that can cause a non-empty CQ to be deleted.

    1. The OS stops running the thread you created to run ServerImpl::Run
    2. main is resumed
    3. `delete server' invokes ServerImpl::~ServerImpl
    4. This calls server_->Shutdown() and then cq_->Shutdown();
    5. After ServerImpl::~ServerImpl is done, the C++ runtime invokes the
       destructors for the ServerImpl member variables, one of which is cq_.
    6. The unique_ptr<CompetionQueue>::~unique_ptr destructor deletes the
       completion queue.
    7. Assertion failure.

    Since ServerImpl::Run isn't running, there's nothing to drain the CQ.

    (Also, the delete server deletes an object while it is still being used on
    another thread. You'll want to make sure that thread_ has finished and been
    joined before deleting server.)

    Try something like this instead. It basically gets rid of ServiceImpl. Once
    you have this working, you can package that up into an object if you want.
    (Warning: I wrote this code in my email editor. It may not compile, and it
    may not work, but it should help point you in the right direction.)

    void PumpCompletionQueue(CompletionQueue* cq) {
      while (true) {
        void* tag;
        bool ok;

        if (cq->Next(&tag, &ok)) {
          // handle dispatching tag: this is the CallData stuff
        } else {

          std::cout << Queue is drained" << std::endl;
          return;
        }
      }
    }

    int main() {
      Greeter::AsyncService service;

      ServerBuilder sb;
      // configure as needed
      sb.RegisterService(&service);
      std::unique_ptr<CompletionQueue> cq = sb.AddCompletionQueue();
      std::unique_ptr<Server> server = sb.BuildAndStart();

      std::thread pumpCqThread { &PumpCompletionQueue, cq.get() };

      sleep(30);

      // start the shutdown process
      server->Shutdown();
      cq->Shutdown();
      pumpCqThread.join(); // ensures the CQ has been drained

      // no more RPCs are being handled & the CQ is empty. The objects can be
      // deleted.
      //
      // These calls to .reset aren't strictly needed, as the unique_ptr
      // destructors will run when main exits, but there's here to make it
      // easier to set breakpoints and step through what's going on in the
      // debugger.
      server.reset();
      cq.reset();

      return 0;
    }

    Hope this helps!

    HAMDI KHALID

    unread,
    Mar 17, 2022, 2:56:55 PM3/17/22
    to grpc.io
    By
    Hamdi Khalid
    image_84933fc0-9129-4592-808c-42c804bf23df20220307_205515.jpg
    Reply all
    Reply to author
    Forward
    0 new messages