Overriding channel-layer's group_send and group_add to add persistence

156 views
Skip to first unread message

Alex

unread,
Mar 23, 2018, 5:56:13 AM3/23/18
to Django users
I've been trying to add persistence to channel layers, such that each new consumer joining a group is sent the most recent message from that group, on connect. Below are my attempts. For some reason, the message in the highlighted line always seems to be of type 'None'. Am I going about this completely incorrectly? I'd be really grateful for any help.


from channels_redis.core import RedisChannelLayer
from channels.exceptions import ChannelFull
import time


class RedisChannelLayerGroupPersistence(RedisChannelLayer):




    async
def group_send(self, group, message):
       
"""
        Sends a message to the entire group.
        """

       
assert self.valid_group_name(group), "Group name not valid"
       
# Retrieve list of all channel names
        key
= self._group_key(group)
        pers_key
= str(key) + "_PERS"
        async
with self.connection(self.consistent_hash(group)) as connection:
           
# Discard old channels based on group_expiry
            await connection
.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry)
           
# Return current lot
            channel_names
= [
                x
.decode("utf8") for x in
                await connection
.zrange(key, 0, -1)
           
]
       
# TODO: More efficient implementation (lua script per shard?)  try:
            await connection
.persist(pers_key)
            await connection
.set(pers_key, str(message))
           
print("TYPE = ================================================ {}".format(type(str(message))))


       
for channel in channel_names:
           
try:
                await
self.send(channel, message)
           
except ChannelFull:
               
pass


    async
def group_add(self, group, channel):


       
"""
        Adds the channel name to a group.
        """

       
# Check the inputs
       
assert self.valid_group_name(group), "Group name not valid"
       
assert self.valid_channel_name(channel), "Channel name not valid"
       
# Get a connection to the right shard
        group_key
= self._group_key(group)
        pers_key
= str(group_key) + "_PERS"
        async
with self.connection(self.consistent_hash(group)) as connection:
            message
= await connection.get(pers_key) #ISSUE HERE ------------------ MESSAGE IS NONE
           
# Add to group sorted set with creation time as timestamp
            await connection
.zadd(
                group_key
,
                time
.time(),
                channel
,
           
)
           
# Set expiration to be group_expiry, since everything in
           
# it at this point is guaranteed to expire before that
           
try:
                await
self.send(channel, str(message))
           
except ChannelFull:
               
pass




        await connection
.expire(group_key, self.group_expiry)


Andrew Godwin

unread,
Mar 23, 2018, 11:58:37 AM3/23/18
to django...@googlegroups.com
It looks correct at first glance - I would insert a debugger there and see what the Redis database contained manually at that point.

Andrew

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/abc8747d-8d80-4ec4-a2ca-d5e91c161c08%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Alex

unread,
Mar 23, 2018, 12:01:43 PM3/23/18
to Django users
Hi,

I've used the redis-cli to get the contents of the key, and it has filled it properly, so the information is definitely in redis under that key. The issue seems to be that message = await connection.get(pers_key) always returns none. One thing I'm certain of is that it's in redis!

Alex
To unsubscribe from this group and stop receiving emails from it, send an email to django-users...@googlegroups.com.

Andrew Godwin

unread,
Mar 23, 2018, 12:05:16 PM3/23/18
to django...@googlegroups.com
I would check the connection is going to the right server/database as well? But past that, I can't help you - I'd try doing some things with plain aioredis to see if you can replicate it there.

Andrew

To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.

To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.

Alex

unread,
Mar 23, 2018, 12:07:56 PM3/23/18
to Django users
That did occur to me, but both connect using the function below, so unless they're somehow referring to different groups, I'm not sure how they could be going to different servers/databases...
async with self.connection(self.consistent_hash(group)) as connection:


Alex

unread,
Mar 23, 2018, 12:13:22 PM3/23/18
to Django users
I've just checked, and self.consistent_hash(group) returns '0' in both functions, so it should be connecting to the same db...


On Friday, 23 March 2018 16:05:16 UTC, Andrew Godwin wrote:

Alex

unread,
Apr 1, 2018, 7:02:58 AM4/1/18
to Django users
I have a partial solution to this, if anybody is interested. For some reason, creating a new version of the core.py file in which RedisChannelLayer is defined, and editing it directly, gave the desired results. It was something about overriding only part of it, but I have no idea what!
Reply all
Reply to author
Forward
0 new messages