Closing a stream with a long living context

1,517 views
Skip to first unread message

Michael Bond

unread,
Feb 6, 2017, 10:29:14 AM2/6/17
to grpc.io
Hey, trying to make sure I'm doing this correctly.

Right now I'm having issues with closing streams started with a context that is passed around and exists for quite awhile.

In this example "ctx" is passed around to many go routines, I want to keep "ctx" around but passing it to "grpcStream" seems to keep the stream from actually closing. What I did below fixed the issue but I wanted to know if it is needed to pass a child context and cancel it for the stream to actually close. Is CloseSend() not sufficient if the context is still alive?

log.Println("stream starting")
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := grpcStream(streamCtx, otherArgs)
if err != nil {
    errCh <- err
    return
}
defer stream.CloseSend()
defer log.Println("closing stream")


Thanks!

Josh Humphries

unread,
Feb 6, 2017, 10:37:15 AM2/6/17
to Michael Bond, grpc.io
On the client, the CloseSend method is "half-closing" the stream. So it closes the request/upload half of the stream. The stream remains open until the server closes the other half: the response/download part of the stream. Cancelling the stream also closes it (as would the channel being disconnected or the call timing out).



----

Josh Humphries

FullStory  |  Atlanta, GA

Software Engineer

j...@fullstory.com


--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/85470940-69e3-4f4c-aed2-31a3242841a3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Josh Humphries

unread,
Feb 6, 2017, 10:50:04 AM2/6/17
to Michael Bond, grpc.io
Perhaps more helpful: in your code example, you would then consume the responses by calling Recv() on the stream until it returns an error (io.EOF on successful end of stream or some other error if the call fails). Even if you are not expecting any response data from the server, you want to call Recv() in order to learn the ultimate disposition of the call (did it result in an error in the server or was it processed successfully?).

log.Println("stream starting")
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := grpcStream(streamCtx, otherArgs)
if err != nil {
    errCh <- err
    return
}
defer stream.CloseSend()
defer log.Println("closing stream")
for {
    msg, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        errCh <- err
        return
    }
}


----

Josh Humphries

FullStory  |  Atlanta, GA

Software Engineer

j...@fullstory.com


Michael Bond

unread,
Feb 6, 2017, 1:19:03 PM2/6/17
to grpc.io, kemper...@gmail.com, j...@fullstory.com
Thanks for the quick reply. Should of specified that the code in the original post is a snippet, there's receiving logic underneath it.

Some more details surrounding this. In this case I have a callback function on the server (written in python) that needs to be executed to free resources, so closing the sending portion does not seems to trigger that. Also the client in this case dictates all connections. The server simply pours a stream of data to the client until the client no longer needs that particular data. So to be more specific with my question how would I fully close the stream from the client's side?
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

Josh Humphries

unread,
Feb 6, 2017, 1:24:33 PM2/6/17
to Michael Bond, grpc.io
On Mon, Feb 6, 2017 at 1:19 PM, Michael Bond <kemper...@gmail.com> wrote:
Thanks for the quick reply. Should of specified that the code in the original post is a snippet, there's receiving logic underneath it.

Some more details surrounding this. In this case I have a callback function on the server (written in python) that needs to be executed to free resources, so closing the sending portion does not seems to trigger that. Also the client in this case dictates all connections. The server simply pours a stream of data to the client until the client no longer needs that particular data. So to be more specific with my question how would I fully close the stream from the client's side?

Not sure I follow 100%. But the python code should have similar logic where it is receiving the request messages. When the client half-closes the stream, the server would get EOF trying to receive (or if python APIs were async/push, like the Java APIs are, you'd get an "end of stream" notification). Is that where you are doing the clean up?

Michael Bond

unread,
Feb 6, 2017, 1:49:12 PM2/6/17
to grpc.io, kemper...@gmail.com, j...@fullstory.com
This is a unary to stream, not stream to stream rpc, if that changes anything. So would calling CloseSend() send a message to the python server? Then would I just need to handle said message to make sure the callbacks are executed?

Basic flow:
Go client opens stream to python server with args
Python server dumps back data to go client while the client reads it
Go client no longer needs particular data and closes the stream
Python server executes callback functions to clean up resources <- this is what currently wasn't happening with CloseSend() until I added a child context and cancelled it

Francesco Lazzarino

unread,
Feb 6, 2017, 2:20:33 PM2/6/17
to grpc.io, kemper...@gmail.com, j...@fullstory.com
Reading the go and python docs seem to indicate that the implied contract for response streaming is that the termination of the data stream is the sole responsibility of the server.

Given that, if a client decides it has consumed enough and wants to move on, it doesn't imply that the stream has been fully consumed, just that a client wants to move on.

This makes me think that the call level (on the context) and not the stream level is the appropriate place to terminate from a client. Is it possible that CloseSend() exists for implementation purposes and not to close the stream from the client side?

On Monday, February 6, 2017 at 9:37:15 AM UTC-6, Josh Humphries wrote:
On the client, the CloseSend method is "half-closing" the stream. So it closes the request/upload half of the stream. The stream remains open until the server closes the other half: the response/download part of the stream. Cancelling the stream also closes it (as would the channel being disconnected or the call timing out).



----

Josh Humphries

FullStory  |  Atlanta, GA

Software Engineer

j...@fullstory.com


On Mon, Feb 6, 2017 at 10:29 AM, Michael Bond <kemper...@gmail.com> wrote:
Hey, trying to make sure I'm doing this correctly.

Right now I'm having issues with closing streams started with a context that is passed around and exists for quite awhile.

In this example "ctx" is passed around to many go routines, I want to keep "ctx" around but passing it to "grpcStream" seems to keep the stream from actually closing. What I did below fixed the issue but I wanted to know if it is needed to pass a child context and cancel it for the stream to actually close. Is CloseSend() not sufficient if the context is still alive?

log.Println("stream starting")
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := grpcStream(streamCtx, otherArgs)
if err != nil {
    errCh <- err
    return
}
defer stream.CloseSend()
defer log.Println("closing stream")


Thanks!

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.

Josh Humphries

unread,
Feb 6, 2017, 4:26:23 PM2/6/17
to Michael Bond, grpc.io
On Mon, Feb 6, 2017 at 1:49 PM, Michael Bond <kemper...@gmail.com> wrote:
This is a unary to stream, not stream to stream rpc, if that changes anything.

Do you mean server-streaming (e.g. unary request message, streaming response)? If that is the case, calling stream.CloseSend() has no effect as the stream is already closed. Take a look at the generated code for a server-streaming method, and you'll see that the generated stub calls SendMsg() and CloseSend() for you: https://github.com/grpc/grpc-go/blob/883bfc7bc8feeb7d90501b977e1c23447b9ff136/test/grpc_testing/test.pb.go#L416
 
So would calling CloseSend() send a message to the python server? Then would I just need to handle said message to make sure the callbacks are executed? 

Basic flow:
Go client opens stream to python server with args
Python server dumps back data to go client while the client reads it
Go client no longer needs particular data and closes the stream

If the client no longer cares about the response stream and has already finished sending request messages, then cancelling the context is the appropriate action to take.
 
Python server executes callback functions to clean up resources <- this is what currently wasn't happening with CloseSend() until I added a child context and cancelled it

I still don't quite understand how you've got this wired up. A code sample of your Python server code might help. But, in any event, it sounds like you genuinely need the client to cancel.

Another possibility could be to use bi-di streaming and then have the server polling for request messages and do the clean up when you reach the end of the request stream. But it sounds like that might only add needless complexity to the server endpoint.

Michael Bond

unread,
Feb 6, 2017, 5:11:37 PM2/6/17
to grpc.io, kemper...@gmail.com, j...@fullstory.com
I did mean server-streaming, sorry for the confusion. And awesome, that's exactly what I was trying to figure out, thanks!

As for the python part, here is a snippet:

    def GrpcService(self, request, context):
        arg = request.arg
        data_stream = some_function(arg)

        # callback to free up resources
        context.add_callback(data_stream.close)

        for data in data_stream:
            # indefinitely serve data

The add_callback() is from http://www.grpc.io/grpc/python/grpc.html?highlight=add_callback#grpc.RpcContext.add_callback
Reply all
Reply to author
Forward
0 new messages