Re: [grpc-io] Synchonize gRPC threads / calls using shared_futures and share data between threads in C++

36 views
Skip to first unread message
Message has been deleted

Jeff Steger

unread,
May 13, 2020, 5:58:23 PM5/13/20
to lange....@gmail.com, grpc.io
I think the use case you are describing is better implemented using a condition variable. All threads wait on the condition variable and then the newcommandmsg will signal all. Then the threads execute and can wait again on same condition. this is how consumer/producer pattern works, which is what you are implementing essentially.

On Wed, May 13, 2020 at 4:36 PM <lange....@gmail.com> wrote:
Hey,
for a project I start understanding how gRPC works. For this I implemented the following setup:

A C++ server using the sync API offers two services RegisterCommand(streaming) and NewCommandMsg(blocking). This is the .proto definition:

service Command {
    rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {}
    rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {}
}
what do I try to achieve?

Multiple clients shall call RegisterCommand and the server shall block inside the procedure until a call to NewCommandMsg happened (I guarantee that only one single call happens at a time). If NewCommandMsg is called, the argument CommandMessage shall be transported to every thread of RegisterCommand (I understood every call is handled in a thread), the thread shall be unblocked and the CommandMessage shall be written to the stream. After that, the threads of RegisterCommand shall be blocked again and wait for the next call to NewCommandMsg. Later, the NewCommandMsg will be replaced by a single non-grpc thread.

What did I already do

I read a lot about (shared) futures, promise, mutex and conditional variables in C++ and implemented the following code.

class CommandServiceImpl final : public Command::Service {
    //To my understanding these are common for all threads
    std::promise<CommandMessage> newCommandPromise;
    std::shared_future<CommandMessage> newCommandFuture = this->newCommandPromise.get_future();

    //To my understanding this is executed in an own thread
    Status RegisterCommand(ServerContext* context, const CommandRequest* request, ServerWriter<CommandMessage>* writer) override {
        //Each thread gets its own copy of the shared future
        std::shared_future<CommandMessage> future = this->newCommandFuture;
        while(!context->IsCancelled()){
            future.wait();
            (void)future.get();
            std::cout << "distributing command" << std::endl;
            //actual writing would happen here
        }

        return Status::CANCELLED;
    }

    //To my understanding this is executed in an own thread
    Status NewCommandMsg(ServerContext* context, const CommandMessage* request, google::protobuf::Empty* response) override {
        std::promise<CommandMessage> promise = move(this->newCommandPromise);

        std::cout << "new command received" << std::endl;
        promise.set_value(*request);

        //Provide new promise, for next call
        //In my evaluation phase, I guarantee, that only one client at a time will call NewCommandMsg
        std::promise<CommandMessage> cleanPromise;
        this->newCommandPromise = move(cleanPromise);

        return Status::OK;
    }
};
What happens with that code

After one or multiple concurrent calls to RegisterCommand, the server blocks and after a call to NewCommandMessage, the future.wait() unblocks, which is expected. After that, of course future.wait() is always non-blocking, so that the threads run in an infinite loop. But it may only run exactly once and then wait for new data to be available.

It seems that it is not possible to "reuse" an existing future. Any ideas on how to achieve my goal?

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/78dbca9e-9ece-4632-a18a-6a2bd12f93d6%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages