I'm adding a chat feature to my app. So I've got an RPC method called 'messageStream' that streams chat message to the user in realtime when they arrive.
I'm using redis' pubsub functionality - each user is subscribed to a topic named after their ID. The code is currently as follows:
override def messageStream(request: Empty,
responseObserver: StreamObserver[Message]): Unit = {
val currentUserId = userIdProvider.get()
// register a callback to the redis client that forwards messages to
// each user
val redisClient = RedisPubSub(
channels = Seq(currentUserId.toString),
patterns = Seq(),
onMessage = (m: PubSubMessage) => {
// send the message to the client
responseObserver.onValue(Message.newBuilder().setMessage(m.data).build())
}
)
// todo: block here...
}
I expect this will probably work alright, but I need to find a way to block at the end of the method so that the redis client remains connected. I also need a way to detect if clients disconnect so I can stop the redis client.
If I combine these two requirements I could end up with some elegant code:
override def messageStream(request: Empty,
responseObserver: StreamObserver[Message]): Unit = {
val currentUserId = userIdProvider.get()
val messageStreamResult = Promise[Boolean]()
// register a callback to the redis client that forwards messages to
// each user
val redisClient = RedisPubSub(
channels = Seq(currentUserId.toString),
patterns = Seq(),
onMessage = (m: PubSubMessage) => {
responseObserver.onValue(Message.newBuilder().setMessage(m.data).build())
}
)
// How to install a callback when the client disconnects?
server.clientDisconnects = {
redisClient.unsubscribe(currentUserId.toString)
redisClient.stop()
messageStreamResult.complete(Try(true))
responseObserver.onCompleted()
}
Await.result(messageStreamResult.future, Duration.Inf)
}
Do any callbacks exist like `clientDisconnects` above so I can perform some cleanup action when the client disconnects? Alternatively, what's the best way of detecting whether the client has disconnected?
Thanks