from channels_redis.core import RedisChannelLayer
from channels.exceptions import ChannelFull
import time
class RedisChannelLayerGroupPersistence(RedisChannelLayer):
async def group_send(self, group, message):
"""
Sends a message to the entire group.
"""
assert self.valid_group_name(group), "Group name not valid"
# Retrieve list of all channel names
key = self._group_key(group)
pers_key = str(key) + "_PERS"
async with self.connection(self.consistent_hash(group)) as connection:
# Discard old channels based on group_expiry
await connection.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry)
# Return current lot
channel_names = [
x.decode("utf8") for x in
await connection.zrange(key, 0, -1)
]
# TODO: More efficient implementation (lua script per shard?) try:
await connection.persist(pers_key)
await connection.set(pers_key, str(message))
print("TYPE = ================================================ {}".format(type(str(message))))
for channel in channel_names:
try:
await self.send(channel, message)
except ChannelFull:
pass
async def group_add(self, group, channel):
"""
Adds the channel name to a group.
"""
# Check the inputs
assert self.valid_group_name(group), "Group name not valid"
assert self.valid_channel_name(channel), "Channel name not valid"
# Get a connection to the right shard
group_key = self._group_key(group)
pers_key = str(group_key) + "_PERS"
async with self.connection(self.consistent_hash(group)) as connection:
message = await connection.get(pers_key) #ISSUE HERE ------------------ MESSAGE IS NONE
# Add to group sorted set with creation time as timestamp
await connection.zadd(
group_key,
time.time(),
channel,
)
# Set expiration to be group_expiry, since everything in
# it at this point is guaranteed to expire before that
try:
await self.send(channel, str(message))
except ChannelFull:
pass
await connection.expire(group_key, self.group_expiry)