Go - Reconnection in bidi streaming RPC communication

107 views
Skip to first unread message

Amit Saha

unread,
Jul 10, 2021, 8:37:38 PM7/10/21
to grpc.io
Hi all,

I am implementing a reconnection logic in my client for a bidi RPC method.

This is similar to what
https://stackoverflow.com/questions/66353603/correct-way-to-perform-a-reconnect-with-grpc-client
seeks to do. The summary version is:

If Recv() returns an error other than io.EOF, reconnect/recreate the
streaming connection.

However, this logic doesn't quite seem straightforward with the Send()
method (I think). Unlike the Recv() method, the Send() method's error
in this scenario is an io.EOF,
not a "transport is closing" error. Thus, it's tricky to assume that
the error means we should create a new stream.

What are the community's thoughts on this?

Thanks,
Amit.

dfa...@google.com

unread,
Jul 14, 2021, 1:42:53 PM7/14/21
to grpc.io
Only your send _or_ recv path should be recreating the stream.  I think I would implement this as:

- goroutine performing Send exits on any error (or encounters the error then blocks on a channel until the next step...)
- goroutine performing Recv re-creates the Send goroutine if needed (or unblocks the above with a directive to exit (RPC done) or resume (stream recreated))

Amit Saha

unread,
Jul 14, 2021, 6:53:40 PM7/14/21
to dfa...@google.com, grpc.io
On Thu, Jul 15, 2021 at 3:43 AM 'dfa...@google.com' via grpc.io
<grp...@googlegroups.com> wrote:
>
> Only your send _or_ recv path should be recreating the stream. I think I would implement this as:
>
> - goroutine performing Send exits on any error (or encounters the error then blocks on a channel until the next step...)
> - goroutine performing Recv re-creates the Send goroutine if needed (or unblocks the above with a directive to exit (RPC done) or resume (stream recreated))

Thanks, I am able to understand what you are proposing. With this
approach, when I try to Send() with the stream disconnected, then the
operation will either exit - which is not what I want, or wait for the
recv() call to signal it to recreate the stream. I must then
write my code in a way that the Send() method has some way of
preserving the message that it failed to send and then when it gets a
signal from recv() to recreate the stream, it can then send it.

Did I get that right?



>
> On Saturday, July 10, 2021 at 5:37:38 PM UTC-7 amits...@gmail.com wrote:
>>
>> Hi all,
>>
>> I am implementing a reconnection logic in my client for a bidi RPC method.
>>
>> This is similar to what
>> https://stackoverflow.com/questions/66353603/correct-way-to-perform-a-reconnect-with-grpc-client
>> seeks to do. The summary version is:
>>
>> If Recv() returns an error other than io.EOF, reconnect/recreate the
>> streaming connection.
>>
>> However, this logic doesn't quite seem straightforward with the Send()
>> method (I think). Unlike the Recv() method, the Send() method's error
>> in this scenario is an io.EOF,
>> not a "transport is closing" error. Thus, it's tricky to assume that
>> the error means we should create a new stream.
>>
>> What are the community's thoughts on this?
>>
>> Thanks,
>> Amit.
>
> --
> You received this message because you are subscribed to the Google Groups "grpc.io" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/408779d7-cd5f-4ad2-9033-c3aa38837536n%40googlegroups.com.

dfa...@google.com

unread,
Jul 22, 2021, 12:29:42 PM7/22/21
to grpc.io
On Wednesday, July 14, 2021 at 3:53:40 PM UTC-7 amits...@gmail.com wrote:
Thanks, I am able to understand what you are proposing. With this
approach, when I try to Send() with the stream disconnected, then the
operation will either exit - which is not what I want, or wait for the
recv() call to signal it to recreate the stream. I must then
write my code in a way that the Send() method has some way of
preserving the message that it failed to send and then when it gets a
signal from recv() to recreate the stream, it can then send it.

Did I get that right?

That's almost exactly what I meant.  One difference: in my suggestion, I had the Recv goroutine recreating the thread, which seems simpler since it's Recv that determines whether it needs to recreate it based on the error.  In that design, you'd probably push the stream object itself to the Send goroutine via a channel, or close the channel if the stream is not being recreated.

Amit Saha

unread,
Jul 22, 2021, 11:10:43 PM7/22/21
to dfa...@google.com, grpc.io
Thank you.

Here’s an implementation: https://gist.github.com/amitsaha/cbc74fa75f089dee5df495c7531d16db - will hopefully be useful to somebody else.


--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

Amit Saha

unread,
Jul 23, 2021, 4:42:12 AM7/23/21
to grpc.io
I think this may work:

If we get an io.EOF error from the Send() method, and we call the RecvMsg() method, we can make use of the error value returned by this method to deduce an abnormal termination:

err := stream.Send(&r)
if err == io.EOF {

var m respoonseType

err := stream.RecvMsg(&m)

if err != nil {

// Implement stream recreation logic

}

..

Zach Reyes

unread,
Jul 28, 2021, 6:38:04 PM7/28/21
to grpc.io
Nice, this looks solid to me :).
Reply all
Reply to author
Forward
0 new messages