Cleaning up redis connection after client disconnects from streaming response

1,398 views
Skip to first unread message

Brent Tubbs

unread,
Oct 12, 2012, 6:21:51 AM10/12/12
to django...@googlegroups.com

I've implemented a Server Sent Event API in my Django app to stream realtime updates from my backend to the browser. The backend is a Redis pubsub. My Django view looks like this:

def event_stream(request):
   
"""
   Stream worker events out to browser.
   """


   listener
= events.Listener(
       settings
.EVENTS_PUBSUB_URL,
       channels
=[settings.EVENTS_PUBSUB_CHANNEL],
       buffer_key
=settings.EVENTS_BUFFER_KEY,
       last_event_id
=request.META.get('HTTP_LAST_EVENT_ID')
   
)

   
return http.HttpResponse(listener, mimetype='text/event-stream')

And the events.Listener class that I'm returning as an iterator looks like this:

class Listener(object):
   
def __init__(self, rcon_or_url, channels, buffer_key=None,
                 last_event_id
=None):
       
if isinstance(rcon_or_url, redis.StrictRedis):
           
self.rcon = rcon_or_url
       
elif isinstance(rcon_or_url, basestring):
           
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
       
self.channels = channels
       
self.buffer_key = buffer_key
       
self.last_event_id = last_event_id
       
self.pubsub = self.rcon.pubsub()
       
self.pubsub.subscribe(channels)

   
def __iter__(self):
       
# If we've been initted with a buffer key, then get all the events off
       
# that and spew them out before blocking on the pubsub.
       
if self.buffer_key:
            buffered_events
= self.rcon.lrange(self.buffer_key, 0, -1)
           
for msg in reversed(list(buffered_events)):
               
if (self.last_event_id and json.loads(msg)['id'] ==
                   
self.last_event_id):
                   
break
               
yield to_sse({'data': msg})
       
try:
           
for msg in self.pubsub.listen():
               
if msg['type'] == 'message':
                   
yield to_sse(msg)
       
finally:
            logging
.info('Closing pubsub')
           
self.pubsub.close()
           
self.rcon.connection_pool.disconnect()

I'm able to successfully stream events out to the browser with this setup. However, it seems that the disconnect calls in the listener's "finally" don't ever actually get called. I assume that they're still camped out waiting for messages to come from the pubsub. As clients disconnect and reconnect, I can see the number of connections to my Redis instance climbing and never going down. Once it gets to around 1000, Redis starts freaking out and consuming all the available CPU.

I would like to be able to detect when the client is no longer listening and close the Redis connection(s) at that time.

Things I've tried or thought about:

  1. A connection pool. But as the redis-py README states, "It is not safe to pass PubSub or Pipeline objects between threads."
  2. A middleware to handle the connections, or maybe just disconnections. This won't work because a middleware's process_response() method gets called too early (before http headers are even sent to the client). I need something called when the client disconnects while I'm in the middle of streaming content to them.
  3. The request_finished and got_request_exception signals. The first, like process_response() in a middleware, seems to fire too soon. The second doesn't get called when a client disconnects mid-stream.

Final wrinkle: In production I'm using Gevent so I can get away with keeping a lot of connections open at once. However, this connection leak issue occurs whether I'm using plain old 'manage.py runserver', or Gevent monkeypatched runserver, or Gunicorn's gevent workers.

(Cross-posted at http://stackoverflow.com/questions/12853067/django-cleaning-up-redis-connection-after-client-disconnects-from-stream.  Answer either here or there... I'll make sure the other gets updated for posterity's sake.)

Any help would be greatly appreciated!

Thanks,

Brent

Brent Tubbs

unread,
Oct 15, 2012, 2:40:31 AM10/15/12
to django...@googlegroups.com
After a lot of banging on things and reading framework code, I've found what I
think is the right answer to this question.

1. According to the WSGI PEP, if your application returns an iterator with a
close() method, it should be called by the WSGI server once the response has
finished. Django supports this too. That's a natural place to do the Redis
connection cleanup that I need.

2. There's a bug in Python's wsgiref implementation, and by extension in
Django's 'runserver', that causes close() to be skipped if the client
disconnects from the server mid-stream. See http://bugs.python.org/issue16220.
I've submitted a patch.

3. Even if the server honors close(), it won't be called until a write to the
client actually fails. If your iterator is blocked waiting on the pubsub and
not sending anything, close() won't be called. I've worked around this by
sending a no-op message into the pubsub each time a client connects. That way
when a browser does a normal reconnect, the now-defunct threads will try to
write to their closed connections, throw an exception, then get cleaned up when
the server calls close(). The SSE spec says that any line beginning with a
colon is a comment that should be ignored, so I'm just sending ":\n" as my
no-op message to flush out stale clients.

Sample code for doing this is posted on the Stack Overflow page where I also
posted this question.  See

Reply all
Reply to author
Forward
0 new messages