need advice on redis data structure design

223 views
Skip to first unread message

Aaron Boxer

unread,
Jan 12, 2011, 12:56:02 PM1/12/11
to redi...@googlegroups.com
Hello!

I was wondering if anyone can advise on the best redis data structure
design for the following use case:

1) server receives message blocks
2) N workers consume these message blocks
3) each message block contains a user id
4) for a given user id, message blocks must be processed in order that
they were received by the server
i.e. if a worker is in the middle of processing a message block for
user id FOO, then no other messages
for FOO can be processed until the first message processing is
complete (in case the second message is processed before the first)

Thanks,
Aaron

Demis Bellot

unread,
Jan 12, 2011, 1:08:58 PM1/12/11
to redi...@googlegroups.com
Hey Aaron,

I'll throw out a quick solution to start a discussion on the topic.

I would have start by having all messages ending up in the same Queue (which can either be a LIST or ZSET sorted by time).
I would then have a 'master process' read from this queue and populate a separate worker Queues (i.e. LIST) based on a hash of the userid % No of workers.

Each worker is assigned and then just processes their own Queue.

Interested to hear others approaches...



--
You received this message because you are subscribed to the Google Groups "Redis DB" group.
To post to this group, send email to redi...@googlegroups.com.
To unsubscribe from this group, send email to redis-db+u...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/redis-db?hl=en.




--
- Demis


Josh Berkus

unread,
Jan 12, 2011, 1:49:19 PM1/12/11
to redi...@googlegroups.com
On 1/12/11 10:08 AM, Demis Bellot wrote:
> Hey Aaron,
>
> I'll throw out a quick solution to start a discussion on the topic.
>
> I would have start by having all messages ending up in the same Queue
> (which can either be a LIST or ZSET sorted by time).
> I would then have a 'master process' read from this queue and populate a
> separate worker Queues (i.e. LIST) based on a hash of the userid % No of
> workers.
>
> Each worker is assigned and then just processes their own Queue.

I'm not sure what the benefit of having the master queue is. Couldn't
the event receiving process just write directly to each user-queue?

Also, Aaron, how long does processing of these blocks take, relatively?
And is it a fairly constant amount of time per block, or highly
variable? Are the number of messages per user relatively
well-distributed, or heavily skewed to a couple of users? How many
users, and how many messages/minute for each user?

--
-- Josh Berkus
PostgreSQL Experts Inc.
http://www.pgexperts.com

Andy Lawman

unread,
Jan 12, 2011, 2:06:46 PM1/12/11
to redi...@googlegroups.com
For the server: I'd have one message queue per userid and would use a list to represent this - just xPUSH it on to the list. Use either LPUSH or RPUSH - it doesn't matter,  but use RPOP or LPOP respectively later on to give queue behaviour. I'd also maintain a set of all userids that have messages pending - just SADD the userid to it.

For the consumer: I'd maintain the names of the userids whose queues are currently being worked on in a hash. To find a queue to work on I'd use SRANDMEMBER on the set and then use HSETNX on the hash and check the result. If the userid was added to the hash then process its queue with xPOP until it's empty, if it wasn't then use SRANDMEMBER again to get a different userid and repeat. Once the queue has been worked off then remove it from the hash with HDEL.

As the userid remains in the set, it will be visited again in due course. This means that the race between the hash being updated to reflect that the queue is now empty and a new item being added to the list by the server doesn't matter. However, this does have the cost of visiting empty queues which may be inefficient depending on your work load. (This can be solved with transactions). Again, depending on your work load, it may be better to read 1 message at a time (or at most m) from a queue so that no consumer is hogged by a single queue.

Andy.
--
You received this message because you are subscribed to the Google Groups "Redis DB" group.
To post to this group, send email to redi...@googlegroups.com.
To unsubscribe from this group, send email to redis-db+u...@googlegroups.com.
For more options, visit this group at
http://groups.google.com/group/redis-db?hl=en.






IMPORTANT - CONFIDENTIALITY NOTICE - This e-mail is intended only for the use of the addressee/s above.  It may contain information which is privileged, confidential or otherwise protected from disclosure under applicable laws.  If the reader of this transmission is not the intended recipient, you are hereby notified that any dissemination, printing, distribution, copying, disclosure or the taking of any action in reliance on the contents of this information is strictly prohibited.  If you have received this transmission in error, please immediately notify us by reply e-mail or using the address below and delete the message and any attachments from your system.

Amadeus Services Ltd, World Business Centre 3, 1208 Newall Road, Hounslow, Middlesex, TW6 2TA, Registered number 4040059

Aaron Boxer

unread,
Jan 12, 2011, 2:34:54 PM1/12/11
to redi...@googlegroups.com
Thanks, Demis. I like the hash approach, except what happens if a
worker dies, or a new one is added?

Aaron Boxer

unread,
Jan 12, 2011, 2:39:16 PM1/12/11
to redi...@googlegroups.com
Thanks, Josh.

Yes, the more I think about, the more it looks like the server should
immediately partition the messages by user id.

The user ids actually represent patients in a large hospital; new ones
are being created, and old ones are being updated.
So, there are potentially millions of ids, but only a small subset are
active at a given point in time.
Messages are coming in every few milliseconds, but I'm not sure of the
relative time needed to process them.
And the block size is highly variable.

Aaron Boxer

unread,
Jan 12, 2011, 2:58:01 PM1/12/11
to redi...@googlegroups.com
Thanks Andy. I can see the design taking shape here.
I will probably need to use transactions to avoid empty queues.

Josiah Carlson

unread,
Jan 12, 2011, 2:53:54 PM1/12/11
to redi...@googlegroups.com
This solution will be similar to others, though it should be better behaved than others (doesn't rely on random selection).

I'm going to include code here instead of a description, because it's easier to just code it. I will include comments...

PRIORITY_QUEUE = 'queue:priority'
WORK_QUEUE = 'queue:work'
USER_QUEUE = 'queue:user:'
CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0

def worker(conn):
    while 1:
        # get the next user to work on
        user_id = conn.blpop(WORK_QUEUE)
        user_queue = USER_QUEUE + user_id
        # actually process the user
        while conn.llen(user_queue) > 0:
            work_item = conn.lpop(user_queue)
            handle_work_item(work_item)
        # set the user to be valid to work on again
        with distributed_lock(user_id):
            llen = conn.llen(user_queue)
            if not llen:
                conn.zrem(PRIORITY_QUEUE, user_id)
            else:
                conn.zadd(PRIORITY_QUEUE, user_id, -llen)

def master(conn):
    pipe = conn.pipeline(True)
    while 1:
        # find some item with work
        smallest = conn.zrange(PRIORITY_QUEUE, 0, 0, withscores=True)
        if not smallest or smallest[0][1] > 0:
            time.sleep(.1)
            continue
        # set the work item up for processing
        user_id = smallest[0][0]
        pipe.zadd(PRIORITY_QUEUE, user_id, CONVENIENTLY_SIZED_FLOAT)
        pipe.rpush(WORK_QUEUE, user_id)
        pipe.execute()

def add_work(conn, item):
    with distributed_lock(user_id):
        conn.rpush(USER_QUEUE + item.user_id, item)
        # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
        # this will not decrement the double.
        conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)


Assuming that you have implemented a distributed lock, and your work function, this should work without race conditions, should prioritize heavily-updated users, and should clean up after itself.

Regards,
 - Josiah

Josiah Carlson

unread,
Jan 12, 2011, 3:02:41 PM1/12/11
to redi...@googlegroups.com
The above line should read "with distributed_lock(item.user_id):"

 - Josiah

Aaron Boxer

unread,
Jan 12, 2011, 3:20:33 PM1/12/11
to redi...@googlegroups.com
Thanks, Josiah. Am in the process of grokking the code.

>         # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
>         # this will not decrement the double.
>         conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)

Why is that?

Josiah Carlson

unread,
Jan 12, 2011, 3:32:10 PM1/12/11
to redi...@googlegroups.com
On Wed, Jan 12, 2011 at 12:20 PM, Aaron Boxer <box...@gmail.com> wrote:
Thanks, Josiah. Am in the process of grokking the code.

Read add_item(), then master(), then worker() .  That should help you understand better.

>         # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
>         # this will not decrement the double.
>         conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)

Why is that?

Precision. An IEEE FP double (as used on 64 bit compilations of Redis) has 53 bits of precision to the right of the first '1' digit for non-denormalized numbers (it's 23 bits for FP singles). If you set the number to be sufficiently high, decrementing by 1 doesn't change the value...

In Python (which uses doubles for it's floating-point representation)...
>>> v = float(2**53)
>>> v, v-1
(9007199254740992.0, 9007199254740991.0)
>>> v = float(2**54)
>>> v, v-1
(18014398509481984.0, 18014398509481984.0)

Aaron Boxer

unread,
Jan 12, 2011, 4:08:52 PM1/12/11
to redi...@googlegroups.com
Thanks, Very devious :)

On Wed, Jan 12, 2011 at 3:32 PM, Josiah Carlson

Aaron Boxer

unread,
Jan 12, 2011, 9:37:24 PM1/12/11
to redi...@googlegroups.com
OK, I think I understand how this works.
One potential problem: heavily-updated user may starve all other
users, because it will always
have the lowest score. In my use case, I need to make sure that all
users are processed in a timely fashion.

I made some small mods:


PRIORITY_QUEUE = 'queue:priority'
WORK_QUEUE = 'queue:work'
USER_QUEUE = 'queue:user:'
CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0

def worker(conn):
while 1:
# get the next user to work on
user_id = conn.blpop(WORK_QUEUE)
user_queue = USER_QUEUE + user_id
# actually process the user
while conn.llen(user_queue) > 0:
work_item = conn.lpop(user_queue)
handle_work_item(work_item)
# set the user to be valid to work on again
with distributed_lock(user_id):
llen = conn.llen(user_queue)
if not llen:
conn.zrem(PRIORITY_QUEUE, user_id)
else:

conn.zadd(PRIORITY_QUEUE, user_id, 1)

def master(conn):
pipe = conn.pipeline(True)
while 1:
# find some item with work
smallest = conn.zrange(PRIORITY_QUEUE, 0, 0, withscores=True)

if not smallest or smallest[0][1] == CONVENIENTLY_SIZED_FLOAT:


time.sleep(.1)
continue
# set the work item up for processing
user_id = smallest[0][0]
pipe.zadd(PRIORITY_QUEUE, user_id, CONVENIENTLY_SIZED_FLOAT)
pipe.rpush(WORK_QUEUE, user_id)
pipe.execute()

def add_work(conn, item):
with distributed_lock(item.user_id):


conn.rpush(USER_QUEUE + item.user_id, item)
# If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
# this will not decrement the double.
conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)


To get a true priority queue, I could have a counter that increments
with each item added to the queue, and use this
counter as the score.

On Wed, Jan 12, 2011 at 2:53 PM, Josiah Carlson
<josiah....@gmail.com> wrote:

Josiah Carlson

unread,
Jan 13, 2011, 12:46:57 AM1/13/11
to redi...@googlegroups.com
On Wed, Jan 12, 2011 at 6:37 PM, Aaron Boxer <box...@gmail.com> wrote:
OK, I think I understand how this works.
One potential problem: heavily-updated user may starve all other
users, because it will always
have the lowest score. In my use case, I need to make sure that all
users are processed in a timely fashion.

As long as the number of workers is greater than the number users that are continually being updated (in your use-case, I doubt that it is an issue), you shouldn't run into the problem. However, if you want to get a bit better round-robin behavior, see my modification below...

PRIORITY_QUEUE = 'queue:priority'
WORK_QUEUE = 'queue:work'
USER_QUEUE = 'queue:user:'
CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0

def worker(conn):
   while 1:
       # get the next user to work on
       user_id = conn.blpop(WORK_QUEUE)
       user_queue = USER_QUEUE + user_id
       # actually process the user
       while conn.llen(user_queue) > 0:
           work_item = conn.lpop(user_queue)
           handle_work_item(work_item)
       # set the user to be valid to work on again
       with distributed_lock(user_id):
           llen = conn.llen(user_queue)
           if not llen:
               conn.zrem(PRIORITY_QUEUE, user_id)
           else:
               # De-prioritize the heavy user.
               conn.zadd(PRIORITY_QUEUE, user_id, llen)

def master(conn):
   pipe = conn.pipeline(True)
   while 1:
       # find some item with work
       smallest = conn.zrange(PRIORITY_QUEUE, 0, 0, withscores=True)
       if not smallest or smallest[0][1] == CONVENIENTLY_SIZED_FLOAT:
           time.sleep(.1)
           continue
       # set the work item up for processing
       user_id = smallest[0][0]
       pipe.zadd(PRIORITY_QUEUE, user_id, CONVENIENTLY_SIZED_FLOAT)
       pipe.rpush(WORK_QUEUE, user_id)
       pipe.execute()

def add_work(conn, item):
   with distributed_lock(item.user_id):
       conn.rpush(USER_QUEUE + item.user_id, item)
       # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
       # this will not decrement the double.
       conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)

 
To get a true priority queue, I could have a counter that increments
with each item added to the queue, and use this
counter as the score.

That's actually exactly what the original version does, except that it doesn't alter the priority while it is being worked on. That is, when an item is not being worked on, the "priority" is decremented (starting at 0). The more items that are added, the lower the score, and the higher the priority.

Only when an item is currently being worked on do we stop allowing scores, and with my modification above, if someone is getting a huge amount of changes (enough to sneak at least one item in between an "llen" and the acquisition of a distributed lock), they are de-prioritized as much as they have added. Depending on how much you want to de-prioritize the user, you could change the last value in worker() to be 1 (as you had it), log(llen), llen (like I have it above), llen*log(llen), llen**1.5, or even llen**2, depending on how harsh you wanted to be. But again, because of your particular use-case, I doubt it will come up often enough to matter much.

For the general case (arbitrary work-queues), I'd recommend somewhere in the log(llen) ... llen**1.5 range.

Aaron Boxer

unread,
Jan 13, 2011, 10:58:49 AM1/13/11
to redi...@googlegroups.com
Cool. Thanks a lot! This will be very helpful.

Aaron Boxer

unread,
Jan 28, 2011, 6:19:59 PM1/28/11
to redi...@googlegroups.com
Thanks again; I've implemented a version of this in C#:

https://github.com/mythz/ServiceStack.Redis/blob/master/src/ServiceStack.Redis/Support/Queue/Implementation/RedisSequentialWorkQueue.cs

On Thu, Jan 13, 2011 at 12:46 AM, Josiah Carlson
<josiah....@gmail.com> wrote:

Josiah Carlson

unread,
Jan 28, 2011, 6:38:09 PM1/28/11
to redi...@googlegroups.com
Looks good :)

 - Josiah
Reply all
Reply to author
Forward
0 new messages