Deadlines with infinite streams

75 views
Skip to first unread message

Mark Fine

unread,
Mar 18, 2019, 4:41:04 PM3/18/19
to grpc.io
How do people manage deadlines for infinite streams? The documentation on ClientContext set_deadline()


indicates that "This method should only be called before invoking the rpc" - it doesn't seem possible to re-set the deadline throughout the lifecycle of the connection. Are deadlines precluded from infinite streams? How do people determine when they're infinite streams potentially get stalled? Out-of-band detection?

Thanks!
Mark

Carl Mastrangelo

unread,
Mar 18, 2019, 6:40:08 PM3/18/19
to grpc.io
Deadlines apply to the whole RPC.  However, you can make an interceptor that cancels the RPC if a message hasn't been received in a time period.  Without going into too much detail, there are lots of different ways to handle deadlines in the streaming case, so gRPC punts this up to the application.  For the (extremely common) case of Unary calls, the deadline is the right way to make sure the call eventually ends.

Mark Fine

unread,
Apr 9, 2019, 5:58:43 PM4/9/19
to Carl Mastrangelo, grpc.io
Hi Craig!

Thanks for the feedback - I'm trying to take a first pass at going about this by using cq.AyncNext() with a timeout - I wasn't able to find much information on interceptor. I wasn't able to find many examples with AsyncNext() out there, but have something working along the lines of:

void AgentOutputClient::call(const std::string &port) {
  auto chan(create_client(config_, port));
  auto stub(Agent::NewStub(chan));

  grpc::ClientContext context;
  AgentRequest input;
  input.set_name(*onfig_.name);

  grpc::CompletionQueue cq;
  auto data(stub->PrepareAsyncstream_output(&context, input, &cq));

  uint64_t in(0);
  data->StartCall(&in);

  bool ok;
  uint64_t *out;
  if (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
    if (ok) {
      std::cout << in << " " << *out << " OK" << std::endl;
      assert(in == *out);
      bool loop(false);
      while (loop) {
        grpc::CompletionQueue::NextStatus status(cq.AsyncNext(reinterpret_cast<void **>(&out), &ok, std::chrono::system_clock::now() + seconds));
        switch (status) {
        case grpc::CompletionQueue::NextStatus::GOT_EVENT: {
          std::cout << "GOT EVENT " << *out << " " << in << std::endl;
          if (ok) {
            assert(in == *out);
            // do stuff
          } else {
            loop = false;
          }
          break;
        }
        case grpc::CompletionQueue::NextStatus::TIMEOUT: {
          loop = false;
          std::cout << "TIMEOUT" << std::endl;
          break;
        }
        case grpc::CompletionQueue::NextStatus::SHUTDOWN: {
          loop = false;
          std::cout << "SHUTDOWN" << std::endl;
          break;
        }
        }
      }
    }
  }

  grpc::Status status;
  in++;
  data->Finish(&status, &in);
  if (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
    std::cout << in << " " << *out << " " << status.error_code() << std::endl;
    assert(in == *out);
  }
  cq.Shutdown();
  while (cq.Next(reinterpret_cast<void **>(&out), &ok)) {
    std::cout << *out << " DRAIN" << std::endl;
  }
}

This seems to work ok for picking up stalled infinite streams. I'm not sure if things are completely getting cleaned up above in the example - I get the following lines logged out:

E0409 14:32:07.457586000 140735515956096 backup_poller.cc:110]         run_poller: {"created":"@1554845527.457559000","description":"Timer list shutdown","file":"/Users/mark/repos/orion/third_party/grpc/src/core/lib/iomgr/timer_generic.cc","file_line":283}

Thanks for any advice or pointers!

Mark

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/b93eeb6d-fac1-4ed7-bb49-679b8a2c7c3b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

yas...@google.com

unread,
Apr 22, 2019, 3:14:35 PM4/22/19
to grpc.io
From the error, it looks like some operation is happening after grpc has begun or is done shutting down. At a quick glance, it looks like you aren't making sure that tags are received. You would want to call TryCancel() on the context for cancelling the call. That would help cancel operations that are stuck inside grpc waiting on an event.

On a side note - Documentation on interceptors for C++ can be found here - https://github.com/grpc/grpc/blob/master/include/grpcpp/impl/codegen/interceptor.h
To unsubscribe from this group and stop receiving emails from it, send an email to grp...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages