While implementing asynchronous streaming grpc, there are no straight forward examples that can be used, it was hard time for me to implement it. Now that I have implemented the hello world version of async stream version, I thought I will share.
In this example, client will request for stream of replies by initiating asynchronous RPC and the server will respond with 5 replies, after server is done with 5 replies it will close the RPC by calling finish and then client will close the RPC.
Hope it will be helpful.
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), repliesSent_(0), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
std::cout << "Creating Call data for new client connections: " << this << std::endl;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
(void*) this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() +
std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
} else if (status_ == PROCESSING) {
if (repliesSent_ == MAX_REPLIES) {
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(Status::OK, this);
} else {
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() + std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
}
} else {
GPR_ASSERT(status_ == FINISH);
std::cout << "Completed RPC for: " << this << std::endl;
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
uint32_t repliesSent_;
const uint32_t MAX_REPLIES = 5;
// The means to get back to the client.
ServerAsyncWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
CallStatus status_; // The current serving state.
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload and sends it to the server.
void SayHello(const std::string& user) {
HelloRequest request;
// Data we are sending to the server.
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// stub_->AsyncSayHello() performs the RPC call, returning an instance to
// store in "call". Because we are using the asynchronous API, we need to
// hold on to the "call" instance in order to get updates on the ongoing RPC.
call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, (void *)call);
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
ResponseHandler* responseHandler = static_cast<ResponseHandler*>(got_tag);
std::cout << "Tag received: " << responseHandler << std::endl;
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
std::cout << "Next returned: " << ok << std::endl;
responseHandler->HandleResponse(ok);
}
}
private:
class ResponseHandler {
public:
virtual bool HandleResponse(bool eventStatus) = 0;
};
// struct for keeping state and data information
class AsyncClientCall: public ResponseHandler {
enum CallStatus {CREATE, PROCESS, PROCESSED, FINISH};
CallStatus callStatus_;
public:
AsyncClientCall(): callStatus_(CREATE) {}
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
//std::unique_ptr<ClientAsyncResponseReader<HelloReply>> response_reader;
std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> response_reader;
bool HandleResponse(bool responseStatus) override {
switch (callStatus_) {
case CREATE:
if (responseStatus) {
response_reader->Read(&reply, (void*)this);
callStatus_ = PROCESS;
} else {
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case PROCESS:
if (responseStatus) {
std::cout << "Greeter received: " << this << " : " << reply.message() << std::endl;
response_reader->Read(&reply, (void*)this);
} else {
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case FINISH:
if (status.ok()) {
std::cout << "Server Response Completed: " << this << " CallData: " << this << std::endl;
}
else {
std::cout << "RPC failed" << std::endl;
}
delete this;
}
}
};
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr<Greeter::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
std::string user("world");
greeter.SayHello(user); // The actual RPC call!
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}