[grpc-go] How to properly use context.Done()?

54 views
Skip to first unread message

Gennady Karev

unread,
Oct 19, 2017, 3:20:05 PM10/19/17
to grpc.io
Hi all!

grpc-go version 1.7.0
If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done() function, but don't know how to use it properly in my case. Can somebody explain please?


func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}

pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()

for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}

notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
}
}


Menghan Li

unread,
Oct 24, 2017, 7:15:54 PM10/24/17
to grpc.io
If there's a way for the redisClient to create a pubsub client with a given context, you should call that function with the stream context. All cancellation and timeout will be propagated automatically.

If that's not possible, you can move the for loop into a goroutine. In the service handler goroutine, check stream context, and close your pubsub client when stream is done.
Something like:

ctx := stream.Context()
pubsub := subscribe()

go func() {
  for { pubsub.Receive() ... }
}

<-ctx.Done()
pubsub.Close()

Thanks,
Menghan
Reply all
Reply to author
Forward
0 new messages