Thanks for the feedback - I'm trying to take a first pass at going about this by using cq.AyncNext() with a timeout - I wasn't able to find much information on interceptor. I wasn't able to find many examples with AsyncNext() out there, but have something working along the lines of:
void AgentOutputClient::call(const std::string &port) {
auto chan(create_client(config_, port));
auto stub(Agent::NewStub(chan));
grpc::ClientContext context;
AgentRequest input;
input.set_name(*onfig_.name);
grpc::CompletionQueue cq;
auto data(stub->PrepareAsyncstream_output(&context, input, &cq));
uint64_t in(0);
data->StartCall(&in);
bool ok;
uint64_t *out;
if (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
if (ok) {
std::cout << in << " " << *out << " OK" << std::endl;
assert(in == *out);
bool loop(false);
while (loop) {
grpc::CompletionQueue::NextStatus status(cq.AsyncNext(reinterpret_cast<void **>(&out), &ok, std::chrono::system_clock::now() + seconds));
switch (status) {
case grpc::CompletionQueue::NextStatus::GOT_EVENT: {
std::cout << "GOT EVENT " << *out << " " << in << std::endl;
if (ok) {
assert(in == *out);
// do stuff
} else {
loop = false;
}
break;
}
case grpc::CompletionQueue::NextStatus::TIMEOUT: {
loop = false;
std::cout << "TIMEOUT" << std::endl;
break;
}
case grpc::CompletionQueue::NextStatus::SHUTDOWN: {
loop = false;
std::cout << "SHUTDOWN" << std::endl;
break;
}
}
}
}
}
grpc::Status status;
in++;
data->Finish(&status, &in);
if (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
std::cout << in << " " << *out << " " << status.error_code() << std::endl;
assert(in == *out);
}
cq.Shutdown();
while (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
std::cout << *out << " DRAIN" << std::endl;
}
}
This seems to work ok for picking up stalled infinite streams. I'm not sure if things are completely getting cleaned up above in the example - I get the following lines logged out: