Hi folks !
I got server gRPC application implemented with C++ and Java client on the other side. Googling for few days didn't help me a lot so I hope on your advices.
Rpc looks like this
rpc GetStuff(GetStuffRequest) returns (stream Stuff);
So client sends me request and right after it I start to stream some Stuff. My question is what can I do in case of weak network ? For example connection can be dropped every 5 seconds or every hour and in the same time data is accumulating on server side no matter connection is alive or dead. I must guarantee that every Stuff is delivered to client in real time (I can observe it in frontend) and in case of broken stream data should be collected on server and delivered right after reconnection.
Two major problems which I faced.
1. What is the exact sequence of reestablishing connection? Here is some code:
void start()
{
grpc::ServerBuilder builder;
builder.AddListeningPort(m_serverAddress, grpc::InsecureServerCredentials());
builder.RegisterService(&m_asyncService);
m_cqGetStuff = builder.AddCompletionQueue();
m_server = std::move(builder.BuildAndStart());
m_streamGetStuff.reset(new grpc::ServerAsyncWriter<::de::sensorline::slrc::classax::Stuff>(&m_context));
m_asyncService.RequestGetStuff(&m_context, &m_requestGetStuff, m_streamGetStuff.get(), m_cqGetStuff.get(), m_cqGetStuff.get(), reinterpret_cast<void*>(TAG::start));
m_context.AsyncNotifyWhenDone(reinterpret_cast<void*>(TAG::disc));
m_thread.reset(new std::thread(&loop, this));
}
void loop()
{
while(true)
{
void* got_tag = nullptr;
bool ok = false;
if (!m_cqGetVehicles->Next(&got_tag, &ok)) {
LOGERROR("GetStuff Server stream closed. Quitting");
break;
}
if (TAG::disc == static_cast<TAG>(reinterpret_cast<size_t>(got_tag))) {
if(m_context.IsCancelled()) {
m_asyncService.RequestGetStuff(&m_context, &m_requestGetStuff, m_streamGetStuff.get(), m_cqGetStuff.get(), m_cqGetStuff.get(), reinterpret_cast<void*>(TAG::start));
}
}
if (TAG::start == static_cast<TAG>(reinterpret_cast<size_t>(got_tag))) {
// grab data from database, put it into internal container and make first Write()
}
if (TAG::write == static_cast<TAG>(reinterpret_cast<size_t>(got_tag))) {
m_streamGetStuff->Write(stuffFromDatabase, reinterpret_cast<void*>(TAG::write));
}
}
}
It's simplified a lot but you can see the idea. With this implementation after context is cancelled I have never received TAG::start, only TAG::write always. I've been playing a lot with different sequences but didn't achieve any success with reconnection. Should I shut down completion queue and add it again ? Or send initial meta data ? I'm really confused with this topic for pretty long time so would appreciate any advices.
2. Previously I had sync communication model and it was working almost perfectly fine but it had to be changed to async and here is what I observe. About 9 of 10 times client can't connect to server but nothing was changed inside network or somwhere else, only this code snippet. Some details:
- Server receives request from client side with appropriate data
- Server starts streaming and I observe that every next message with TAG::write is "ok"
- Stream can keep going for hours
- Nothing gets to the client untill I kill server (ctrl + C). Right after process is dead the whole bunch of messages which were streamed from server appear on client's frontend
Traffic is not really huge. I sent 1 message (about 200 bytes) every 2 seconds. In this case the connection is very stable. On client side I observed that it's always in state CONNECTING. But about 10% of times it connects clearly everything works fine.
I'm ready to answer any questions, attach traces or whatever you ask. Thanks a lot in advance !