I am designing a subscription API through gRPC in which:
- Request is stream of subscription filters, updated as client sees fit
- Response is stream of messages matching subscription filters
The goal is to have a single thread responsible for polling the subscriptions for updated filters, performing queries to create results sets for the filters, and publishing these results back out to the clients.
Currently, I have something like the following:
rpc Subscribe(stream Request) returns (stream Response);
def Subscribe(self, request_iterator, context):
response_queue = Queue.Queue()
subscription_manager.register_subscription(request_iterator, response_queue)
while True:
response = response_queue.get(timeout=0.5)
yield response
except Queue.Empty:
return
The first issue is that exposure to the stream of requests is provided through an iterator interface which blocks on next(). In order to have a single thread responsible for pulling requests, I need to have access to a non-blocking way to check if any requests are present the subscription and advance to the next subscription if not. In order to ensure that a lack of requests on one subscription doesn't block consumption of requests on another, I currently have to dedicate a thread per subscription each blocking on iteration over the request_iterator and applying updates as they are received.
The second issue is that exposure to the stream of responses is such that the rpc implementation is expected to be a generator of the responses. My assumption is that the above code consumes a server worker thread for the duration of a subscription call.
Overall this translates to 1 thread per subscription for iterating requests, 1 thread per subscription for yielding responses, and 1 thread iterating over the subscription params, querying, and populating the queues. Is there any lower-level access to the streams, i.e. something that can be selected over for the requests, etc?
Thank you.