C# server streaming interceptor

73 views
Skip to first unread message

Mark Nuttall-Smith

unread,
Feb 15, 2019, 4:58:41 AM2/15/19
to grpc.io

Hi,
I'm trying to add a C# interceptor that will handle refreshing an access token if it was expired.

Here is my current code:

        public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
           
TRequest request, ClientInterceptorContext<TRequest, TResponse> context,
           
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
       
{
           
var cancellationToken = context.Options.CancellationToken;


           
var token = Task.Run(() => GetAccessTokenAsync(cancellationToken), cancellationToken).Result;


           
if (token.IsMissing())
               
if (Task.Run(() => RefreshTokensAsync(cancellationToken), cancellationToken).Result == false)
                    _logger
.Warn("Unable to refresh access token");


           
AddOrUpdateHeader(context.Options.Headers, CreateBearerTokenHeader(token));


           
try
           
{
               
return continuation(request, context);
           
}
           
catch (RpcException e) when (e.StatusCode == StatusCode.Unauthenticated)
           
{
               
if (Task.Run(() => RefreshTokensAsync(cancellationToken), cancellationToken).Result == false)
                    _logger
.Warn("Unable to refresh access token");


               
AddOrUpdateHeader(context.Options.Headers, CreateBearerTokenHeader(AccessToken));


               
return continuation(request, context);
           
}
       
}

What is surprising to me is that the RpcException that is thrown is never caught by the exception handler (it is caught by the client however).

Could anyone suggest what I'm missing? Or point to an example of a server streaming retry handler in C#.

Thanks,
Mark

Benjamin Krämer

unread,
Feb 18, 2019, 8:11:33 AM2/18/19
to grpc.io
Hi Mark,

the problem should be that you are directly returning the Task returned by continuation instead of awaiting it. Therefore, it's not unwrapped and not even checked at this point.

You can see an example of how to intercept it (also using async/await) in my OpenTracing interceptor for gRPC: https://github.com/opentracing-contrib/csharp-grpc/tree/master/src/OpenTracing.Contrib.Grpc

The `Interceptors` folder contains the inherited classes, but those are simple calling to the `Handler` classes in the `Handler` folder. In addition, you have to make sure, that the exception could also occur on the `ResponseStream` and not just on the initial call.

I will have a look at your exact situation and will write an example since I have to implement OAuth on gRPC anyway.

Benjamin

Mark Nuttall-Smith

unread,
Feb 18, 2019, 2:08:07 PM2/18/19
to grpc.io
Hi Benjamin,

This is fantastic - thanks very much. I'm going to take a look in more detail tomorrow, but should be able to figure it out from here I think.

Good point about catching exceptions on the ResponseStream also - for the auth use case in particular however, I think I would only be interested in retrying the initial call right?

Cheers, Mark

Benjamin Krämer

unread,
Mar 1, 2019, 9:12:36 AM3/1/19
to grpc.io
You also might want to have a look at https://github.com/grpc/grpc/tree/master/src/csharp/Grpc.Auth since that's the cleaner way to achieve what you try. You can implement an `AsyncAuthInterceptor` that you then can use for `ChannelCredentials`.

I still have to check on how to implement the server side for checking/validating the token and making the result available through the call context. But this should at least solve your current problem.

Mark Nuttall-Smith

unread,
Apr 2, 2019, 10:09:48 AM4/2/19
to grpc.io
I'm finally getting back to this after many distractions, and still not able to get things working unfortunately.

Just to note - I was previously using the AsyncAuthInterceptor for adding a token to the call headers, but because this is not a true interceptor, it's not possible to retry if the call fails authentication on the server side.

So, I'm back to trying to figure out how to retry a server streaming call. My question is, how can I know from the response headers if the call was unauthenticated?

Here's a snippet of my code so far:

var responseContinuation = continuation(request, context);

try
{
   
// Any way to raise an exception when call is StatusCode=Unauthenticated here?
    responseContinuation
.ResponseHeadersAsync.WaitAndUnwrapException();

   
return new AsyncServerStreamingCall<TResponse>(
        responseContinuation
.ResponseStream,
        responseContinuation
.ResponseHeadersAsync,
        responseContinuation
.GetStatus,
        responseContinuation
.GetTrailers,
        responseContinuation
.Dispose);

}
catch (RpcException e) when (e.StatusCode == StatusCode.Unauthenticated)
{
   
if (Task.Run(() => RefreshTokensAsync(cancellationToken), cancellationToken).Result == false)
        _logger
.Warn("Unable to refresh access token");

   
AddOrUpdateHeader(context.Options.Headers, CreateBearerTokenHeader(AccessToken));

   
return continuation(request, context);
}


Any pointers very gratefully received.
Reply all
Reply to author
Forward
0 new messages