I've implemented a Server Sent Event API in my Django app to stream realtime updates from my backend to the browser. The backend is a Redis pubsub. My Django view looks like this:
def event_stream(request):
"""
Stream worker events out to browser.
"""
listener = events.Listener(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
return http.HttpResponse(listener, mimetype='text/event-stream')
And the events.Listener class that I'm returning as an iterator looks like this:
class Listener(object):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
for msg in reversed(list(buffered_events)):
if (self.last_event_id and json.loads(msg)['id'] ==
self.last_event_id):
break
yield to_sse({'data': msg})
try:
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
finally:
logging.info('Closing pubsub')
self.pubsub.close()
self.rcon.connection_pool.disconnect()
I'm able to successfully stream events out to the browser with this setup. However, it seems that the disconnect calls in the listener's "finally" don't ever actually get called. I assume that they're still camped out waiting for messages to come from the pubsub. As clients disconnect and reconnect, I can see the number of connections to my Redis instance climbing and never going down. Once it gets to around 1000, Redis starts freaking out and consuming all the available CPU.
I would like to be able to detect when the client is no longer listening and close the Redis connection(s) at that time.
Things I've tried or thought about:
Final wrinkle: In production I'm using Gevent so I can get away with keeping a lot of connections open at once. However, this connection leak issue occurs whether I'm using plain old 'manage.py runserver', or Gevent monkeypatched runserver, or Gunicorn's gevent workers.
(Cross-posted at http://stackoverflow.com/questions/12853067/django-cleaning-up-redis-connection-after-client-disconnects-from-stream. Answer either here or there... I'll make sure the other gets updated for posterity's sake.)
Any help would be greatly appreciated!
Thanks,
Brent