Client disconnect callback?

3,113 views
Skip to first unread message

bean...@googlemail.com

unread,
Apr 25, 2015, 12:08:06 PM4/25/15
to grp...@googlegroups.com
I'm adding a chat feature to my app. So I've got an RPC method called 'messageStream' that streams chat message to the user in realtime when they arrive.

I'm using redis' pubsub functionality - each user is subscribed to a topic named after their ID. The code is currently as follows:

override def messageStream(request: Empty,
responseObserver: StreamObserver[Message]): Unit = {

val currentUserId = userIdProvider.get()

// register a callback to the redis client that forwards messages to
// each user
val redisClient = RedisPubSub(
channels = Seq(currentUserId.toString),
patterns = Seq(),
onMessage = (m: PubSubMessage) => {
      // send the message to the client
responseObserver.onValue(Message.newBuilder().setMessage(m.data).build())
}
)

// todo: block here...
}

I expect this will probably work alright, but I need to find a way to block at the end of the method so that the redis client remains connected. I also need a way to detect if clients disconnect so I can stop the redis client.

If I combine these two requirements I could end up with some elegant code:

override def messageStream(request: Empty,
responseObserver: StreamObserver[Message]): Unit = {

val currentUserId = userIdProvider.get()

val messageStreamResult = Promise[Boolean]()

// register a callback to the redis client that forwards messages to
// each user
val redisClient = RedisPubSub(
channels = Seq(currentUserId.toString),
patterns = Seq(),
onMessage = (m: PubSubMessage) => {
      responseObserver.onValue(Message.newBuilder().setMessage(m.data).build())
}
)

  // How to install a callback when the client disconnects?
server.clientDisconnects = {
redisClient.unsubscribe(currentUserId.toString)
redisClient.stop()
messageStreamResult.complete(Try(true))
    responseObserver.onCompleted()
}

Await.result(messageStreamResult.future, Duration.Inf)
}

Do any callbacks exist like `clientDisconnects` above so I can perform some cleanup action when the client disconnects? Alternatively, what's the best way of detecting whether the client has disconnected?

Thanks

Eric Anderson

unread,
Apr 25, 2015, 9:47:16 PM4/25/15
to bean...@googlemail.com, grpc-io
On Sat, Apr 25, 2015 at 9:08 AM, <bean...@googlemail.com> wrote:
override def messageStream(request: Empty,
responseObserver: StreamObserver[Message]): Unit = {
...
Do any callbacks exist like `clientDisconnects` above so I can perform some cleanup action when the client disconnects? Alternatively, what's the best way of detecting whether the client has disconnected?

Because it is a unary request, there isn't a callback that notifies you of client cancellation (any error is considered a cancellation to the server). The next time you call onValue though, an exception will be thrown notifying you of the cancellation. That is enough in some scenarios.

The "advanced" interface provides such notification though. It has onHalfClose (which is when onComplete is normally called), onComplete (when the RPC gracefully completed), and onCancel (if there was an error or if the client cancelled). We don't have code generation for the advanced interface though, so you would need to implement ServerCallHandler for each method in the service, create ServerMethodDefinitions and add them to a ServerServiceDefinition, which is what you pass to the ServerBuilder's addService. That would be verbose and annoying, but simple code. It would probably make sense for you to create an issue for us to add codegen for the advanced interface.

bean...@googlemail.com

unread,
Apr 26, 2015, 5:28:48 AM4/26/15
to grp...@googlegroups.com, bean...@googlemail.com
Thanks Eric, I've created an issue (https://github.com/grpc/grpc-java/issues/339).

Do gRPC clients (specifically the Java client) handle reconnecting to the server if there are connectivity issues? I've got a corresponding method that allows the client to send a stream of chat messages to the server, so will I need to add some sort of error handling to make it reconnect, or is that handled by the framework? And also the same question for the method above, which streams chat messages back down from the server to the client - will the client reconnect seamlessly so that if I expose the requestObserver to my client Java code (Android app) can I rely on just calling onValue, or will the (Android) client code need to reconnect explicitly if network connectivity is temporarily lost?

Thanks again.

Eric Anderson

unread,
Apr 26, 2015, 10:26:37 AM4/26/15
to bean...@googlemail.com, grpc-io
On Sun, Apr 26, 2015 at 2:28 AM, <bean...@googlemail.com> wrote:
Do gRPC clients (specifically the Java client) handle reconnecting to the server if there are connectivity issues?

Yes, in general, and yes for Java. You should expect RPCs that were in-progress when the disconnect occurred to fail, but any new RPCs should automatically use a new connection. That is, RPCs don't migrate from one connection to another, so if the connection an RPC is using fails, then that RPC fails.

Today, Java's reconnection logic is simple: when making a new RPC, if there is not yet a working connection or one being created it creates a new one. In the future we expect to have things like exponential backoff for reconnects.

I've got a corresponding method that allows the client to send a stream of chat messages to the server, so will I need to add some sort of error handling to make it reconnect, or is that handled by the framework?

Yes, you would need to create a new RPC if the previous one fails.

In the future, we would also expect to support some retry behavior that could be enabled for idempotent, single-request RPCs. But that hasn't been designed yet and likely wouldn't solve your need here.

And also the same question for the method above, which streams chat messages back down from the server to the client - will the client reconnect seamlessly so that if I expose the requestObserver to my client Java code (Android app) can I rely on just calling onValue, or will the (Android) client code need to reconnect explicitly if network connectivity is temporarily lost?

Yes, it would need to recreate the RPC if the previous fails.

bean...@googlemail.com

unread,
Apr 26, 2015, 11:51:53 AM4/26/15
to grp...@googlegroups.com, bean...@googlemail.com
Thanks for the clarification Eric.
Reply all
Reply to author
Forward
0 new messages