I was wondering if anyone can advise on the best redis data structure
design for the following use case:
1) server receives message blocks
2) N workers consume these message blocks
3) each message block contains a user id
4) for a given user id, message blocks must be processed in order that
they were received by the server
i.e. if a worker is in the middle of processing a message block for
user id FOO, then no other messages
for FOO can be processed until the first message processing is
complete (in case the second message is processed before the first)
Thanks,
Aaron
--
You received this message because you are subscribed to the Google Groups "Redis DB" group.
To post to this group, send email to redi...@googlegroups.com.
To unsubscribe from this group, send email to redis-db+u...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/redis-db?hl=en.
I'm not sure what the benefit of having the master queue is. Couldn't
the event receiving process just write directly to each user-queue?
Also, Aaron, how long does processing of these blocks take, relatively?
And is it a fairly constant amount of time per block, or highly
variable? Are the number of messages per user relatively
well-distributed, or heavily skewed to a couple of users? How many
users, and how many messages/minute for each user?
--
-- Josh Berkus
PostgreSQL Experts Inc.
http://www.pgexperts.com
Yes, the more I think about, the more it looks like the server should
immediately partition the messages by user id.
The user ids actually represent patients in a large hospital; new ones
are being created, and old ones are being updated.
So, there are potentially millions of ids, but only a small subset are
active at a given point in time.
Messages are coming in every few milliseconds, but I'm not sure of the
relative time needed to process them.
And the block size is highly variable.
> # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
> # this will not decrement the double.
> conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)
Why is that?
Thanks, Josiah. Am in the process of grokking the code.
> # If the item is already set to the CONVENIENTLY_SIZED_FLOAT,Why is that?
> # this will not decrement the double.
> conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)
On Wed, Jan 12, 2011 at 3:32 PM, Josiah Carlson
I made some small mods:
PRIORITY_QUEUE = 'queue:priority'
WORK_QUEUE = 'queue:work'
USER_QUEUE = 'queue:user:'
CONVENIENTLY_SIZED_FLOAT = 18014398509481984.0
def worker(conn):
while 1:
# get the next user to work on
user_id = conn.blpop(WORK_QUEUE)
user_queue = USER_QUEUE + user_id
# actually process the user
while conn.llen(user_queue) > 0:
work_item = conn.lpop(user_queue)
handle_work_item(work_item)
# set the user to be valid to work on again
with distributed_lock(user_id):
llen = conn.llen(user_queue)
if not llen:
conn.zrem(PRIORITY_QUEUE, user_id)
else:
conn.zadd(PRIORITY_QUEUE, user_id, 1)
def master(conn):
pipe = conn.pipeline(True)
while 1:
# find some item with work
smallest = conn.zrange(PRIORITY_QUEUE, 0, 0, withscores=True)
if not smallest or smallest[0][1] == CONVENIENTLY_SIZED_FLOAT:
time.sleep(.1)
continue
# set the work item up for processing
user_id = smallest[0][0]
pipe.zadd(PRIORITY_QUEUE, user_id, CONVENIENTLY_SIZED_FLOAT)
pipe.rpush(WORK_QUEUE, user_id)
pipe.execute()
def add_work(conn, item):
with distributed_lock(item.user_id):
conn.rpush(USER_QUEUE + item.user_id, item)
# If the item is already set to the CONVENIENTLY_SIZED_FLOAT,
# this will not decrement the double.
conn.zincrby(PRIORITY_QUEUE, item.user_id, -1)
To get a true priority queue, I could have a counter that increments
with each item added to the queue, and use this
counter as the score.
On Wed, Jan 12, 2011 at 2:53 PM, Josiah Carlson
<josiah....@gmail.com> wrote:
OK, I think I understand how this works.
One potential problem: heavily-updated user may starve all other
users, because it will always
have the lowest score. In my use case, I need to make sure that all
users are processed in a timely fashion.
To get a true priority queue, I could have a counter that increments
with each item added to the queue, and use this
counter as the score.
On Thu, Jan 13, 2011 at 12:46 AM, Josiah Carlson
<josiah....@gmail.com> wrote: