C++: Unable to create new client stream if server restarted

56 views
Skip to first unread message

Sidhartha Thota

unread,
Mar 6, 2019, 9:18:42 PM3/6/19
to grpc.io
HI,

I am new to GRPC and using GRPC C++ for my project. I am using streaming RPC where client will stream the messages it has and server listens to the messages. When streaming is in progress from client side (lets say after couple of messages sent from client end), I restart server (Ctrl+c and start server again) then client is unable to establish new session with server.

If I am not wrong, writer is created per session so I am trying to create new session by creating new writer instance from Stub. I see following error at client

E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion failed: pthread_mutex_lock(mu) == 0     [Client output attached below]

GRPC version using : v1.18

-----------------
Client code:
-------------------

Message getMessage(int i) {
  Message mesg;
  mesg.set_protocol_version(i);
  return mesg;
}

class Client {
 public:
  Client(){
  }

  void sendMessages(std::shared_ptr<Channel> channel,
                    std::unique_ptr<Listener::Stub> &stub_,
                    std::unique_ptr<ClientWriter<Message> > &writer) {
    Message message;
    const int kPoints = 10;

    for (int i = 0; i < kPoints; i++) {
      Message message = getMessage(i);
      std::cout << "Waiting to write " << std::endl;
      if (!writer->Write(message)) {
        std::cout << "broken stream. Will create new session" << "\n";
        while (channel->GetState(true) != GRPC_CHANNEL_READY) {
          std::cout << "Retry connection... after 1 sec" << std::endl;
          std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
        ClientContext context_retry;
        ReplyMessage reply_message_retry;
        writer = std::move(stub_->receive_resources(&context_retry, &reply_message_retry));
        std::cout << "created new session successfully" << std::endl;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
  }
};

int main(int argc, char** argv) {
  std::shared_ptr<Channel> channel;
  channel = grpc::CreateChannel("localhost:50051",
                      grpc::InsecureChannelCredentials());
  std::unique_ptr<Listener::Stub> stub_(Listener::NewStub(channel));
  ReplyMessage reply_message;
  ClientContext context;
  std::unique_ptr<ClientWriter<Message> > writer(stub_->receive_resources(&context, &reply_message));
  Client client;

  std::cout << "-------------- SendMessages --------------" << std::endl;
  client.sendMessages(channel, stub_, writer);
  writer->WritesDone();
  Status status = writer->Finish();
  if (status.ok()) {
    std::cout << "Close message " << reply_message.close_message()
              << std::endl;
  } else {
    std::cout << "RecordRoute rpc failed." << std::endl;
  }

  return 0;
}

---------------------
RPC proto file:
-------------------

package listener;

// Interface exported by the server.
service Listener {
  rpc receive_resources(stream Message) returns (ReplyMessage) {}
}

// Message and ReplyMessage are protos not defined here (Let me know if they are applicable, I can add).

------------------
Server code
-----------------

class ListenerImpl final: public Listener::Service {
  Status receive_resources(ServerContext* context, ServerReader<Message>* reader,
                      ReplyMessage* message) override {
    Message gMessage;
    unsigned int message_count = 0;

    while (reader->Read(&gMessage)) {
      message_count++;
      std::cout << "received" << std::endl;
      std::cout << gMessage.protocol_version() << std::endl;
    }
    std::cout << message_count << std::endl;
    message->set_close_message("close");
    std::cout << "sent close message" << std::endl;
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  ListenerImpl service;

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main(int argc, char** argv) {
  RunServer();
  return 0;
}

-----------------------
Client side output
-----------------------

-------------- SendMessages --------------
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
Waiting to write 
broken stream. Will create new session ---> server stopped and started again
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
created new session successfully
Waiting to write 
E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion failed: pthread_mutex_lock(mu) == 0 ------------------> After creating new writer instance, when I try to write again using writer, then this error is thrown
Aborted

-------------------------
I also tried reusing same ClientConext but received below error so creating new ClientContext object for every creation of writer

"""
E0307 02:14:13.928433306   23563 client_context.cc:87]       assertion failed: call_ == nullptr
Aborted
"""

I also tried creating new channel and stub and also stub alone when channel says it is in ready state, still I see same issue (E0307 02:09:43.854424957   22379 sync_posix.cc:85]           assertion failed: pthread_mutex_lock(mu) == 0)

Not sure where I am missing.

yas...@google.com

unread,
Apr 1, 2019, 2:40:13 PM4/1/19
to grpc.io
The ClientContext object needs to be alive for the entire duration of the RPC. The scope of your ClientContext is too limited. I suggest restructuring your code such that RPC invoking function is separated from the channel state checking. Also, gRPC will automatically try to connect the channel if there are pending calls, so the state checking is unnecessary.
Reply all
Reply to author
Forward
0 new messages