Stucking inside gRPC ClientReaderWriter's Read() function

41 views
Skip to first unread message

narek....@gmail.com

unread,
Oct 26, 2019, 2:24:07 PM10/26/19
to grpc.io

I am working on a C++ project, which uses Google Pub/Sub.

As there is no native support for Google Pub/Sub in C++, I am using it through gRPC. Thus, I have generated corresponding pubsub.grpc.pb.h, pubsub.grpc.pb.cc, pubsub.pb.h and pubsub.pb.cc files via protoc.

I wrote a lightweight wrapper-class, for subscription management. Class basically creates a new thread and starts listening for new messages. Here is the code example (code was built based on this question):


class Consumer
{
public:
   
Consumer();
   
~Consumer();
   
void startConsume();
// ...
    std
::string m_subscriptionName;
    std
::unique_ptr<std::thread> m_thread;
    std
::shared_ptr<grpc::Channel> m_channel;
    std
::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub;
    std
::atomic<bool> m_runThread;
};

Consumer::Consumer()
{
    m_channel
= grpc::CreateChannel("pubsub.googleapis.com:443", grpc::GoogleDefaultCredentials());
    m_stub
= google::pubsub::v1::Subscriber::NewStub(m_channel);
    m_subscriptionName
= "something";
}

Consumer::~Consumer()
{
    m_runThread
= false;
   
if (m_thread && m_thread->joinable())
   
{
        m_thread
->join();
   
}
}

void Consumer::startConsume()
{
    m_thread
.reset(new std::thread([this]() {
        m_runThread
= true;
       
while (m_runThread)
       
{
            grpc
::ClientContext context;
            std
::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest,
                                                     google
::pubsub::v1::StreamingPullResponse>> stream(m_stub->StreamingPull(&context));
           
// send the initial message
            google
::pubsub::v1::StreamingPullRequest req;
            req
.set_subscription(m_subscriptionName);
            req
.set_stream_ack_deadline_seconds(10);
           
           
// if write passed successfully, start subscription
           
if (!stream->Write(req))
               
continue;

           
// receive messages
            google
::pubsub::v1::StreamingPullResponse response;
           
while (stream->Read(&response))
           
{
                google
::pubsub::v1::StreamingPullRequest ack_request;
               
for (const auto& message : response.received_messages())
               
{
                   
// process messages ...
                    ack_request
.add_ack_ids(message.ack_id());
               
}
                stream
->Write(ack_request);
           
}
       
}
   
}));
}

Several instances of the Consumer class are created within a process, each for different topic.

It seems works fine. However sometimes program stucks on

stream->Read(&response)
code. Debugging showed that thread was stuck inside of Read() function call - the stream does not read anything and does not exit from function either, despite that Pub/Sub buffer is not empty.

After restarting the application, all messages are successfully read. It seems like a deadlock inside of Read().

Is there anything that I am doing wrong? What can cause this behavior?

Yang Gao

unread,
Dec 16, 2019, 12:30:57 PM12/16/19
to grpc.io
You can turn on some debug tracing with environment variable GRPC_TRACE, for example setting it to "http,secure_endpoint,tcp". For a complete list, refer to: https://github.com/grpc/grpc/blob/master/doc/environment_variables.md
Reply all
Reply to author
Forward
0 new messages