HI,
I am new to GRPC and using GRPC C++ for my project. I am using streaming RPC where client will stream the messages it has and server listens to the messages. When streaming is in progress from client side (lets say after couple of messages sent from client end), I restart server (Ctrl+c and start server again) then client is unable to establish new session with server.
If I am not wrong, writer is created per session so I am trying to create new session by creating new writer instance from Stub. I see following error at client
E0307 02:09:43.854424957 22379 sync_posix.cc:85] assertion failed: pthread_mutex_lock(mu) == 0 [Client output attached below]
GRPC version using : v1.18
-----------------
Client code:
-------------------
Message getMessage(int i) {
Message mesg;
mesg.set_protocol_version(i);
return mesg;
}
class Client {
public:
Client(){
}
void sendMessages(std::shared_ptr<Channel> channel,
std::unique_ptr<Listener::Stub> &stub_,
std::unique_ptr<ClientWriter<Message> > &writer) {
Message message;
const int kPoints = 10;
for (int i = 0; i < kPoints; i++) {
Message message = getMessage(i);
std::cout << "Waiting to write " << std::endl;
if (!writer->Write(message)) {
std::cout << "broken stream. Will create new session" << "\n";
while (channel->GetState(true) != GRPC_CHANNEL_READY) {
std::cout << "Retry connection... after 1 sec" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
ClientContext context_retry;
ReplyMessage reply_message_retry;
writer = std::move(stub_->receive_resources(&context_retry, &reply_message_retry));
std::cout << "created new session successfully" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
};
int main(int argc, char** argv) {
std::shared_ptr<Channel> channel;
channel = grpc::CreateChannel("localhost:50051",
grpc::InsecureChannelCredentials());
std::unique_ptr<Listener::Stub> stub_(Listener::NewStub(channel));
ReplyMessage reply_message;
ClientContext context;
std::unique_ptr<ClientWriter<Message> > writer(stub_->receive_resources(&context, &reply_message));
Client client;
std::cout << "-------------- SendMessages --------------" << std::endl;
client.sendMessages(channel, stub_, writer);
writer->WritesDone();
Status status = writer->Finish();
if (status.ok()) {
std::cout << "Close message " << reply_message.close_message()
<< std::endl;
} else {
std::cout << "RecordRoute rpc failed." << std::endl;
}
return 0;
}
---------------------
RPC proto file:
-------------------
package listener;
// Interface exported by the server.
service Listener {
rpc receive_resources(stream Message) returns (ReplyMessage) {}
}
// Message and ReplyMessage are protos not defined here (Let me know if they are applicable, I can add).
------------------
Server code
-----------------
class ListenerImpl final: public Listener::Service {
Status receive_resources(ServerContext* context, ServerReader<Message>* reader,
ReplyMessage* message) override {
Message gMessage;
unsigned int message_count = 0;
while (reader->Read(&gMessage)) {
message_count++;
std::cout << "received" << std::endl;
std::cout << gMessage.protocol_version() << std::endl;
}
std::cout << message_count << std::endl;
message->set_close_message("close");
std::cout << "sent close message" << std::endl;
return Status::OK;
}
};
void RunServer() {
ListenerImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
RunServer();
return 0;
}
-----------------------
Client side output
-----------------------
-------------- SendMessages --------------
Waiting to write
Waiting to write
Waiting to write
Waiting to write
Waiting to write
Waiting to write
Waiting to write
Waiting to write
broken stream. Will create new session ---> server stopped and started again
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
Retry connection... after 1 sec
created new session successfully
Waiting to write
E0307 02:09:43.854424957 22379 sync_posix.cc:85] assertion failed: pthread_mutex_lock(mu) == 0 ------------------> After creating new writer instance, when I try to write again using writer, then this error is thrown
Aborted
-------------------------
I also tried reusing same ClientConext but received below error so creating new ClientContext object for every creation of writer
"""
E0307 02:14:13.928433306 23563 client_context.cc:87] assertion failed: call_ == nullptr
Aborted
"""
I also tried creating new channel and stub and also stub alone when channel says it is in ready state, still I see same issue (E0307 02:09:43.854424957 22379 sync_posix.cc:85] assertion failed: pthread_mutex_lock(mu) == 0)
Not sure where I am missing.