Python Stream

55 views
Skip to first unread message

eaksea87

unread,
Jul 18, 2017, 12:26:27 PM7/18/17
to grpc.io
I am designing a subscription API through gRPC in which:
  • Request is stream of subscription filters, updated as client sees fit
  • Response is stream of messages matching subscription filters
The goal is to have a single thread responsible for polling the subscriptions for updated filters, performing queries to create results sets for the filters, and publishing these results back out to the clients.

Currently, I have something like the following:

rpc Subscribe(stream Request) returns (stream Response);

def Subscribe(self, request_iterator, context):
    response_queue = Queue.Queue()
    subscription_manager.register_subscription(request_iterator, response_queue)
    while True:
        response = response_queue.get(timeout=0.5)
        yield response
    except Queue.Empty:
        return

The first issue is that exposure to the stream of requests is provided through an iterator interface which blocks on next(). In order to have a single thread responsible for pulling requests, I need to have access to a non-blocking way to check if any requests are present the subscription and advance to the next subscription if not. In order to ensure that a lack of requests on one subscription doesn't block consumption of requests on another, I currently have to dedicate a thread per subscription each blocking on iteration over the request_iterator and applying updates as they are received.

The second issue is that exposure to the stream of responses is such that the rpc implementation is expected to be a generator of the responses. My assumption is that the above code consumes a server worker thread for the duration of a subscription call.

Overall this translates to 1 thread per subscription for iterating requests, 1 thread per subscription for yielding responses, and 1 thread iterating over the subscription params, querying, and populating the queues. Is there any lower-level access to the streams, i.e. something that can be selected over for the requests, etc?

Thank you.

Nathaniel Manista

unread,
Jul 19, 2017, 11:46:13 AM7/19/17
to eaksea87, grpc.io
You've hit the nail very much on the head: there is not at this time any supported means of accessing request messages from the request stream without using a thread (be it the thread in which your server-side application code was called or another thread under your control) to "pull" them. We'd like to add this capability but it would be something that would happen in the months-to-quarters time frame.
-Nathaniel
Reply all
Reply to author
Forward
0 new messages