Redis Queue System (ex Amazon SQS)

335 views
Skip to first unread message

chameleon95

unread,
Apr 10, 2009, 12:21:57 PM4/10/09
to Redis DB
I am looking at the option to build a Simple Queue System using
Redis.. to take over from a current Amazon SQS setup..

With Redis using a LIST in a FIFO fashion...

With Amazon SQS you set a timer on each queue item on retrieval.. this
timer will suppress this queue item while you process the task.. once
the task is complete you return to delete the queue item.. if you dont
return in time the queue item will reappear and be reissued to a new
requesting client..

When designing software to use Amazon SQS you must take into account
your requesting clients may see queue items more than once.

The question...??? In the same way you are offering EXPIRE on keys is
there a simple solution to add a timer to a LIST item suspending it
for a set time, if we don't return to delete the LIST item on
completion of the task (ie. client failure) then have it reappear in
the same position so a new client can collect and process the task..

Steve..

Salvatore Sanfilippo

unread,
Apr 10, 2009, 12:48:37 PM4/10/09
to redi...@googlegroups.com
On Fri, Apr 10, 2009 at 6:21 PM, chameleon95 <steve...@gmail.com> wrote:
>
> I am looking at the option to build a Simple Queue System using
> Redis.. to take over from a current Amazon SQS setup..
>
> With Redis using a LIST in a FIFO fashion...
>
> With Amazon SQS you set a timer on each queue item on retrieval.. this
> timer will suppress this queue item while you process the task.. once
> the task is complete you return to delete the queue item.. if you dont
> return in time the queue item will reappear and be reissued to a new
> requesting client..

The probem you describe is a general one.

You have items to be processed
You have clients to process this items
And you have #FAILure in the nature of the world :) That is: clients may fail

And if a client fails, and it popped the element, who will process it?!
You also require all this to be atomic. If a client will get an
element from the queue, and then it fails, the queue must appear again
atomically and only the next one will get it.

> When designing software to use Amazon SQS you must take into account
> your requesting clients may see queue items more than once.

Yep, this may happen if it takes too much time to complete the task
but was not dead I guess.

> The question...??? In the same way you are offering EXPIRE on keys is
> there a simple solution to add a timer to a LIST item suspending it
> for a set time, if we don't return to delete the LIST item on
> completion of the task (ie. client failure) then have it reappear in
> the same position so a new client can collect and process the task..

Here expire is not the solution unfortunately since the granularity is
the whole list.
And we currently don't have LOCK/UNLOCK so it is not possible to
inspect the top element, add it to a temp queue, then actually pop it.
it's not atomic.

If for your usage case order of processing does not matter you can use
a whole Redis 'DB' as queue.
Every time I want to add a new item I select DB0, increment a counter
to get an unique key, so:

INCR myUniqueId => 100
SET item_100 "... value to be processed ..."

Then a client want to process something inside the queue, that is
actually the whole DB0

it uses RANDOMKEY to get a key at random. Now we have the name of a key:

RANDOMKEY => 100

We now use RENAME, that is an atomic operation, to attach time and
status information to our entry:

RENAME item_100 processing_100_<current_unix_time>

If RENAME fails (returns zero) then we know that another client took
this item already. so GOTO RANDOMKEY ;)
Select a new key and so on. Instead if RENAME returned 1 we are happy.
We obtained a form of "locking".

We can now process the query. Now you wonder what happens if RANDOMKEY
will return a key in the form of processing_... but we will see it in
a moment.

When our processing was done we can just remove
processing_100_<unixtime> and reiterate to process more stuff.

RECOVERY

What happens if our client #Fails?

It can fail only in four different stages:

Fail #1 just after RANDOMKEY. not a problem at all...
Fail #2 just after the RENAME. We will have processing_100_<unixtime>
in the DB forever
Fail #3 just after the entry was processed... so not in time to issue
the DEL on the key.

This failures are simple to hande. Basically when RANDOMKEY returns
processing_100_<unixtime> the client checks that this entry is not too
old. If it is too old we try to move it into
processing_100_<new-unix-time> with RENAME. If we failed another
client arrived before us. And so forth.

A client must be prepared to see the same item multiple times... like
in the case of Amazon. Because Fail #3 will actually process the key
but don't delete it.

Unfortunately this does not solve the ordering problem, that is, you
have a "key space" and not a "queue". Sometimes it is nice, some other
times you are more happy processing the keys in FIFO or LIFO fashion,
it depends on the kind of problem.

I think it is possible to find a way to have even the ordering thing
actually, I'll think a bit about it. Otherwise we can add some new
primitive as general as possible to allow for this kind of things to
be done.

Cheers,
Salvatore

--
Salvatore 'antirez' Sanfilippo
http://invece.org

Salvatore Sanfilippo

unread,
Apr 10, 2009, 1:35:38 PM4/10/09
to redi...@googlegroups.com
On Fri, Apr 10, 2009 at 6:48 PM, Salvatore Sanfilippo <ant...@gmail.com> wrote:

> RANDOMKEY => 100

Sorry here I mean:

RANDOMKEY => item_100

Jeremy Dunck

unread,
Apr 10, 2009, 2:07:20 PM4/10/09
to redi...@googlegroups.com
On Fri, Apr 10, 2009 at 11:21 AM, chameleon95 <steve...@gmail.com> wrote:
...

>
> When designing software to use Amazon SQS you must take into account
> your requesting clients may see queue items more than once.

Have a look at beanstalkd for a worker queue pretty similar to SQS.

(I like redis, but am not sure it's a good replacement for worker queues.)

Salvatore Sanfilippo

unread,
Apr 10, 2009, 2:25:16 PM4/10/09
to redi...@googlegroups.com

Hello Jeremy,

I agree, there are systems specifically conceived to be worker queues
that will work better out of the box. But on the other hand I think
that Redis should export all the primitives to build such a queue if
the user needs one. This is useful since in many environments there
can be Redis servers holding data, and some need for worker queues,
and to maintain multiple systems can be hard.

The queue I described even if appears to use tricks actually has
provably time and space complexity characteristics, is persistent
(automatically), can be replicated and so on. But the fact it's not
able to process elements in the order received is not a great thing
indeed. I absolutely want to add a primitive to let this things
happen, or to find a way exploiting just things already available in
order to implement one.

Probably the simplest primitive that solves all this problems is:
LPOPSTORE and RPOPSTORE (random names):

LPOPSTORE key destkey

pop an element from the list at key and store the value at destkey.
this primitive is probably enough in order to turn the proposed schema
in one with ordering. It is also faster since to add a new task only a
single LPUSH/RPUSH call is needed. For a worker to take an element is
a bit more complex as there is to generate an unique key with INCR,
then use LPOPSTORE, process the key, and then DELETE. A special
worker, or from time to time any normal worker, will LPUSH timedout
keys back in the queue if they are not processed in time.

chameleon95

unread,
Apr 11, 2009, 4:59:54 AM4/11/09
to Redis DB
The idea of using Redis is to simplify my setup.. I am planning on
fine tuning boxes to run Redis/Nginx..

Here is a basic outline of what is in my head..

Queue (LIST): queue::1234
Queue Item (STRING): queue_item::
1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
Queue Timer (SET): queue_timer::00 (05 10 15 20 25 30 35 40 45 50 55)

# Add Queue Item

RPUSH queue::1234 3F2504E0-4F89-11D3-9A0C-0305E82C3301
SET queue_item::1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
QueueInstruction

Add queue item to tail of list. Set queue item value/instruction.

# Get Queue Item

LPOP queue::1234
SADD queue_timer::35 queue_item::
1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
GET queue_item::1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301

Get single queue item and remove item (atomically) from start of list.
This is my simple timer with a 5 min increment.. say I want a 20 min
timer.. current time 13:13 + 20 min = 13:33, round to the 5 min and I
have a 22 min timer set..

# Delete Queue Item

SREM queue_timer::35 queue_item::
1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
DEL queue_item::1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301

Delete from timer and delete key.

# Queue Timer

SREM queue_timer::35 queue_item::
1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
LPUSH queue::1234 3F2504E0-4F89-11D3-9A0C-0305E82C3301

I run a simple background process or cron job every 5 mins to pick up
any remaining keys.. so when 13:35 comes round if the key still exists
in the set for whatever reason we simply remove from timer set and add
back to the start of the queue list so it is picked up by the next
available worker.. This timer setup will work for all jobs less than
60 mins..

# Queue Length

LLEN queue::1234

This simply gives me queue length to help me provision my available
worker pool according to demand.


Your LPOPSTORE RPOPSTORE would definitely be of use. I have thought
about RANDOMKEY but in most use case a FIFO or LIFO is required. I
plan to initially code in PHP and once concept proven will look to
create plugin for Nginx (direct Nginx/Redis).

Hope this makes sense.. Any feedback appreciated..

Salvatore Sanfilippo

unread,
Apr 12, 2009, 11:21:58 AM4/12/09
to redi...@googlegroups.com
Hello,

this seems like a smart approach to me. It is probably possible to
simplify it but the key point of your idea IMHO is that you can
duplicate the element in the queue adding stage, where it is not a
problem if the client fails after just the first addition since you
can instruct the workers that if the item is just in one place it is
invalid or something like this.

I'll review in depth your implementation in the next couple of days,
my girlfriend may kill me if I work today and I've a son, it's better
to continue to live. Thank you!

Cheers,
Salvatore

Ericson Smith

unread,
Apr 12, 2009, 9:03:26 PM4/12/09
to Redis DB
So we had this similar problem recently. We try to keep things simple
and make other things configurable.

Pieces:
* Queue
* Temporary holding list

You start by putting a unit of work in the queue. Each unit of work
has a definition:
* id
* name
* max_duration
* etc...

When your slave machine pops that job off the queue, it puts it in the
temporary holding list. The job gets certain attributes added to it:

* slave_machine_ip
* process_pid
* start_time
* result

So your dispatcher is constantly watching this stuff. If something in
the holding list exceeds its configurable time, and the process_pid no
longer exists on slave_machine_ip, then you know that the job died --
thus the dispatcher re-queues it. You can query the temporary holding
list for jobs currently running. You can kill a job, because you know
its ip and pid. And the job cleans up after itself when its done,
because all tasks should run in the process framework.

Simple stuff. Works for us for years, only now, Redis makes it faster
and we can throw out the DB for all of this, and of course the list
operations being atomic, means we don't have to do any fancy table or
record locking.

- Ericson Smith
http://www.funadvice.com
http://www.askmygf.com

chameleon95

unread,
Apr 13, 2009, 12:19:39 AM4/13/09
to Redis DB
Ericson,

Your solution is very simple for a RDBMS where you can simply query
start_time/max_duration locating all jobs timed out.. It is very
similar to a solution I have been using for years using MySQL..

With Redis we cannot query like this.. ( start_time + max_duration <=
current_time ) and this is why I thought of the idea using the key
sets based on expire time.. I would be happy with 5 min increments but
can be changed to 1 min and it would cover most of use cases..

Regarding PID & IP.. For my setup this is not necessary, I like the
worker pool to be as dumb and lazy as possible.. the more you need to
check, monitor, etc the harder to scale.. you will end up monitoring
the monitors monitor.. Imagine 2,000 workers all being polled for PID,
you need another system just to monitor.. Regularly checking logs will
tell you if you need to increase or reduce timeouts for jobs..

All workers are designed to handle seeing the same job more than
once.. All workers are designed to come and go with no configuration..
no handling keys, etc..

Very simple..

Steve..

chameleon95

unread,
Apr 15, 2009, 1:22:06 PM4/15/09
to Redis DB
Salvatore,

I would be grateful of your feedback before I put this together..

Steve..

Salvatore Sanfilippo

unread,
Apr 15, 2009, 2:22:04 PM4/15/09
to redi...@googlegroups.com

Hello!

Sorry for the delay, I did a big release of a new web service in the
latest couple of days and it was pretty hard... but tomorrow I can
study your proposal and reply. Sorry again!

Btw this new site that will have a pretty high traffic is using Redis
as a cache, with a lot of EXPIREs. So in the next days we'll start to
have a feeling about the preformances in a production environment.

Regards,

Salvatore Sanfilippo

unread,
Apr 16, 2009, 11:55:51 AM4/16/09
to redi...@googlegroups.com
On Sat, Apr 11, 2009 at 10:59 AM, chameleon95 <steve...@gmail.com> wrote:
>
> The idea of using Redis is to simplify my setup.. I am planning on
> fine tuning boxes to run Redis/Nginx..

Hello Chamelon95. My proposal was working in a given way in order to
ensure that you can't lost items, I think it's not the case with your
proposal, but that it give me the right idea to create one.

Let's start with what does not work with your:


> LPOP queue::1234
> SADD queue_timer::35 queue_item::
> 1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301
> GET queue_item::1234::3F2504E0-4F89-11D3-9A0C-0305E82C3301

If your client dies after the LPOP you lost your item.

What you want instead is to add the same item multiple times when you
add the queue:

INCR itemId => 55
SADD queue:1234:items 55
LPUSH queue:1234 55
SET item:55 <my item data>

Now to get an item a client must perform:

<START>
LINDEX queue:1234 0 (get the value of the top item to process) => 22
(ok item 22 is at the top of the queue)
SADD queue:1234:processing 22 => if here we got 1 we own the item,
otherwise GOTO <START>
SREM queue:1234:items 22 => if here we got 0 the item was processed in
the meantime! SREM from processing and GOTO <START>
otherwise... SREM returned 1 too, ok we can process this item safely
reading the item data from item:22

Ok what you need with this schema is to re-push in the queue IDs that
stay too longer in queue:1234:processing.
If the same ID stay there for more than N seconds, then you have to
repush it to the queue:1234 list.

Ok, for this to work well even if case of non complete additions to
the queue the client must be smart enough to skip items that can't be
processed because GET item:<id> returns nil and so on. Removing an
item is left as an hacking moment for the reader ;) it's simple if you
understand how the remaining schema works.

Hope this helps.

Btw the trick to spot errors in this kind of stuff is to ask yourself
"what happens if the client dies at this point? and at this point?
What happens to a worker if the client *adding* an item dies after
this command? And so on.

Cheers,
Salvatore
Reply all
Reply to author
Forward
0 new messages