Thanks William for reply,
I need your help debugging the issue which I am facing with this implementation.
I extremely apologies as I am going to write more details of what I am facing so that you will get the complete context. This is a C++ implementation.
I have tweaked, greeter hello example to perform async streaming operation. What I have done is to initiate a stream request from client and wait on read/finish on the client side. On server side, when it gets request, it will start sending the stream of replies, it will send 5 replies by calling write operation and finally it will call "finish" operation.
On client side, I have registered the call data pointer as tag to perform read operation and finish operation. As I expect to perform multiple reads, each time the read event is triggered, I will print the response message and again call Read to get read to receive next read event (The last Read call cause a stale pointer in completion queue, I will explain in the next line).
On server side, after it sends the 5th response, it will call Finish, on client side, the finish tag gets called, where I will say RPC is completed and delete the object, but the problem here is that last Read is registered with stale pointer and this is causing core dump, with message call to pure virtual function.
Here is my code:
Protocol Buffer:
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Client Side code:
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload and sends it to the server.
void SayHello(const std::string& user) {
HelloRequest request;
// Data we are sending to the server.
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
AsyncClientCallCloser* callCloser = new AsyncClientCallCloser(call);
// stub_->AsyncSayHello() performs the RPC call, returning an instance to
// store in "call". Because we are using the asynchronous API, we need to
// hold on to the "call" instance in order to get updates on the ongoing RPC.
call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, (void *)call);
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call object.
call->response_reader->Read(&call->reply, (void*)call);
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call object.
call->response_reader->Finish(&callCloser->status, (void*)callCloser); /// When I get finish, I will delete callClosure and also the call object.
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
ResponseHandler* responseHandler = static_cast<ResponseHandler*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (responseHandler->RequestSent()) {
responseHandler->HandleResponse();
} else {
std::cout << "Hello Request sent successfully" << std::endl;
responseHandler->SetRequestSent();
}
}
}
private:
class ResponseHandler {
public:
virtual bool HandleResponse() = 0;
virtual bool RequestSent() {return true;} ;
virtual void SetRequestSent() {};
};
// struct for keeping state and data information
class AsyncClientCall: public ResponseHandler {
// Request sent event
bool requestSent;
public:
AsyncClientCall(): requestSent(false) {}
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
//std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> response_reader;
bool HandleResponse() override {
std::cout << "Greeter received: " << reply.message() << std::endl;
response_reader->Read(&reply, (void*)this); // Here I read the response and call Read API to get event on next Read, but the last Read registration will remain as stale entry,the object gets deleted when server sends Finish, then I get crash on processing of completion queue
}
bool RequestSent() override {
return requestSent;
}
void SetRequestSent() override {
requestSent = true;
}
};
// struct for keeping state and data information
class AsyncClientCallCloser: public ResponseHandler {
private:
AsyncClientCall *m_call;
public:
AsyncClientCallCloser(AsyncClientCall *call): m_call(call) {}
// Storage for the status of the RPC upon completion.
Status status;
bool HandleResponse() override {
if (status.ok()) {
std::cout << "Server Response Completed" << std::endl;
}
else {
std::cout << "RPC failed" << std::endl;
}
delete m_call;
delete this;
}
};
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<Greeter::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
std::string user("world");
greeter.SayHello(user); // The actual RPC call!
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
Server Side Code:
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), repliesSent_(0), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
std::cout << "Creating Call data for new client connections: " << this << std::endl;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
(void*) this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() +
std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
} else if (status_ == PROCESSING) {
if (repliesSent_ == MAX_REPLIES) {
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(Status::OK, this);
} else {
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() + std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
}
} else {
GPR_ASSERT(status_ == FINISH);
std::cout << "Completed RPC for: " << this << std::endl;
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
uint32_t repliesSent_;
const uint32_t MAX_REPLIES = 5;
// The means to get back to the client.
ServerAsyncWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
CallStatus status_; // The current serving state.
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}
Output:
Server Side:
kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ ./greeter_async_server
Creating Call data for new client connections: 0xb0b5e0
Creating Call data for new client connections: 0xb07970
Sending reponse: 0xb0b5e0 : Hello world1
Sending reponse: 0xb0b5e0 : Hello world2
Sending reponse: 0xb0b5e0 : Hello world3
Sending reponse: 0xb0b5e0 : Hello world4
Sending reponse: 0xb0b5e0 : Hello world5
Completed RPC for: 0xb0b5e0
Client Side:
kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ ./greeter_async_client2
Press control-c to quit
Hello Request sent successfully
Greeter received: Hello world1
Greeter received: Hello world2
Greeter received: Hello world3
Greeter received: Hello world4
Greeter received: Hello world5
Server Response Completed
pure virtual method called
terminate called without an active exception
Aborted (core dumped)
(gdb) bt
#0 0x00007ffff6b4bc37 in __GI_raise (sig=sig@entry=6)
at ../nptl/sysdeps/unix/sysv/linux/raise.c:56
#1 0x00007ffff6b4f028 in __GI_abort () at abort.c:89
#2 0x00007ffff7153535 in __gnu_cxx::__verbose_terminate_handler() ()
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3 0x00007ffff71516d6 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#4 0x00007ffff7151703 in std::terminate() ()
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#5 0x00007ffff71521bf in __cxa_pure_virtual ()
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#6 0x00007ffff7bb6e83 in grpc::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) () from /usr/local/lib/libgrpc++.so.3
#7 0x000000000041368f in grpc::CompletionQueue::Next (this=0x7fffffffde28,
tag=0x7ffff63e7d40, ok=0x7ffff63e7d3f)
at /usr/local/include/grpc++/impl/codegen/completion_queue.h:153
#8 0x0000000000413b83 in GreeterClient::AsyncCompleteRpc (this=0x7fffffffde20)
at greeter_async_client2.cc:92
#9 0x0000000000415b0b in std::_Mem_fn<void (GreeterClient::*)()>::operator()<, void>(GreeterClient*) const (this=0x6258f8, __object=0x7fffffffde20)
at /usr/include/c++/4.8/functional:601
#10 0x0000000000415a5b in std::_Bind_simple<std::_Mem_fn<void (GreeterClient::*)()> (GreeterClient*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x6258f0)
at /usr/include/c++/4.8/functional:1732
#11 0x0000000000415963 in std::_Bind_simple<std::_Mem_fn<void (GreeterClient::*)()> (GreeterClient*)>::operator()() (this=0x6258f0)
at /usr/include/c++/4.8/functional:1720
#12 0x00000000004158fc in std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void (GreeterClient::*)()> (GreeterClient*)> >::_M_run() (this=0x6258d8)
at /usr/include/c++/4.8/thread:115
#13 0x00007ffff71a4a60 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#14 0x00007ffff63f1184 in start_thread (arg=0x7ffff63e8700)
at pthread_create.c:312
#15 0x00007ffff6c12bed in clone ()
at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111