Hi guys,Thanks for the comments and suggestions so far. @Paul, cheers for the heads-up re cloud pubsub, but using a cloud broker is not suitable for my use case.
I found some time to implement a prototype pubsub servicer, and I'd like to share it because I still have a few doubts if I'm doing things right, and a few specific questions.
The servicer has a single bidirectional streaming endpoint. I spawn a new thread in the implementation to process incoming requests, generate a unique id for the connection, do some book-keeping, and send out updates to subscribed clients as they are published.
Specific questions:
1. Is it possible to know when a client has disconnected, so I can release any resources that have been allocated?
2. Would it be more idiomatic to use threadlocals somehow for the book-keeping?
Any other comments and criticism of the code very welcome ;)
# ------------------------------ pubsub.proto
package pubsub;
service PubSub {
rpc subscribe(stream SubscriptionRequest) returns (stream SubscriptionReply);
}
message SubscriptionRequest {
string topic = 1;
}
message SubscriptionReply {
string topic = 1;
string message = 2;
}
# ------------------------------ pubsub_server.py
import queue
import random
import threading
import uuid
from collections import defaultdict
import time
from concurrent import futures
import grpc
import pubsub_pb2
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class PubSub(pubsub_pb2.PubSubServicer):
def __init__(self):
self.client_q = dict()
self.topics = defaultdict(list)
t = threading.Thread(target=self._fake_publisher)
t.start()
def publish(self, topic, message):
reply = pubsub_pb2.SubscriptionReply(topic=topic,
message=message)
print("Publishing topic {} message {}".format(topic, message))
for clientid in self.topics[topic]:
self.client_q[clientid].put(reply)
def subscribe(self, request_iterator, context):
clientid = uuid.uuid4()
self.client_q[clientid] = queue.Queue()
t = threading.Thread(target=self._process_subscriptions,
args=(clientid, request_iterator))
t.start()
while True:
item = self.client_q[clientid].get()
yield item
self.client_q[clientid].task_done()
# What's the best way to release resources and tidy up
# when the client goes away?
def _process_subscriptions(self, clientid, request_iterator):
for request in request_iterator:
print("Received subscription request from {} for topic {}".format(
clientid, request.topic
))
self.topics[request.topic].append(clientid)
def _fake_publisher(self):
while True:
if len(self.topics):
topic = random.choice(list(self.topics.keys()))
self.publish(topic, str(uuid.uuid4()))
time.sleep(1)
def serve():
server = grpc.server(futures.ThreadPoolExecutor())
pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
# ------------------------------ pubsub_client.py
import time
import concurrent
import grpc
import pubsub_pb2
def client(stub):
def my_topics():
yield pubsub_pb2.SubscriptionRequest(topic="grpc")
time.sleep(5)
yield pubsub_pb2.SubscriptionRequest(topic="donkeys")
responses = stub.subscribe(my_topics())
for response in responses:
print("Received topic {} message {}".format(
response.topic, response.message))
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = pubsub_pb2.PubSubStub(channel)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(client, stub)
]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as exc:
print("Generated an exception {}".format(exc))
if __name__ == '__main__':
run()