I've been trying (and struggling) to create a type of discussion between the Server and Client, where the Client sends the initial request, and in some cases, the Server will require additional information from the Client, before it can complete the overall request.
Something like...
Server: Here's the collated data.
Currently, I'm able to get the Server to respond initially (Name?), but on the subsequent response, it aborts abruptly (Age?). The idea here is that occasionally the server may need more information from the client, which we cannot send across in the original message. Furthermore, while the on the client side is hardcoded, that's not the actual case in the implementation. I'm just trying to simplify it here.
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData : ICallback
{
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(HelloConversationSupplierService::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), stream_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed(ModelManagerPartBrepCreationEngineImpl* brepCreationEngine)
{
switch (status_)
{
case START:
{
service_->RequestSayHello(&ctx_, &stream_, cq_, cq_, this);
status_ = PROCESS;
break;
}
case PROCESS:
{
new CallData(service_, cq_);
stream_.Read(&request_, this);
status_ = READ;
break;
}
case READ:
{
if(request_.name() == MethodName::Start)
Start(&response, this);
stream_.Write(_response, this);
status_ = WRITE;
break;
}
case WRITE:
{
stream_.Finish(Status::OK, this);
status_ = FINISH;
break;
}
case FINISH:
{
delete this;
}
}
}
void Start(SayHelloResponse response, CallData* calldata)
{
PersonRef person = engine_->Start(calldata); // I know "engine_ isn't defined elsewhere, but for simplicity, it is a separate process which handles all the heavy work. GRPC is just used to talk back and forth. CallData implements the callbacks the engine may use. (Occasionally, engine won't need anymore data and will return a fully defined PersonRef on its own)
response.set_username(person.userName);
response.set_age(person.Age);
response.set_name(MethodName::Complete);
}
private:
HelloConversationSupplierService::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncReaderWriter<HelloRequest,HelloReply> stream_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { START, PROCESS, READ, WRITE, FINISH };
CallStatus status_; // The current serving state.
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
HelloConversationSupplierService::AsyncService service_;
std::unique_ptr<Server> server_;
};