How does scheduler implement its ETA/countdown behavior on top of RabbitMQ?

3,278 views
Skip to first unread message

ppearcy

unread,
Dec 28, 2010, 7:37:29 PM12/28/10
to celery-users
Hello,
Been playing around with Celery and digging through source code and
docs. If someone had a few minutes, I was curious about the
implementation details of the ETA behavior on top of RabbitMQ (which
does not have any support for ETA).

It appears that a scheduler is used that checks items in a wait queue
(?) and fires once the ETA has passed. Internally, it has a sleep
interval and maintains an ETA ordered list of pending items.

I found some of these details here:
http://celeryproject.org/docs/internals/reference/celery.worker.scheduler.html

and also by taking a look at the change log:
http://celeryproject.org/docs/changelog.html

Specifically, how is an ordered list maintained with RabbitMQ? For
instance, for each sleep interval, is the entire wait queue evaluated
and then either placed into the ready queue or back onto the wait
queue? Or does Celery somehow enforce ordering on top of RabbitMQ? If
it enforces ordering, how does it do this?

Any details or even a pointer to the source code that has this logic
would be great.

Many thanks,
Paul

Ask Solem

unread,
Dec 30, 2010, 6:52:38 AM12/30/10
to celery...@googlegroups.com


Hi Paul,

I'll try to describe it in brief... The worker.scheduler module is actually long gone now,
most of the functionality there is replaced by timer2: https://github.com/ask/timer2
But timer2 is just the internal mechanism that applies a timed function, I'm assuming what
you're really interested in is how the messaging part works.

From the comments in celery.worker.consumer: http://ask.github.com/celery/internals/reference/celery.worker.consumer.html

"
* If the task has an ETA/countdown, the task is moved to the `eta_schedule`
so the :class:`timer2.Timer` can schedule it at its
deadline. Tasks without an eta are moved immediately to the `ready_queue`,
so they can be picked up by the :class:`~celery.worker.controllers.Mediator`
to be sent to the pool.

* When a task with an ETA is received the QoS prefetch count is also
incremented, so another message can be reserved. When the ETA is met
the prefetch count is decremented again, though this cannot happen
immediately because amqplib doesn't support doing broker requests
across threads. Instead the current prefetch count is kept as a
shared counter, so as soon as :meth:`~Consumer.consume_messages`
detects that the value has changed it will send out the actual
QoS event to the broker.
"

So pretty simple, whenever a task with an eta is received we increment the
prefetch_count, when the task is processed we decrement it again. We
can keep the eta tasks in memory for as long as we like since the message
will just be redelivered if the connection is lost and the message is not acked.

Hope this helps,
--
{Ask Solem,
+47 98435213 | twitter.com/asksol }.

ppearcy

unread,
Dec 31, 2010, 1:37:00 AM12/31/10
to celery-users
Many thanks for the details and code pointer.

On Dec 30, 4:52 am, Ask Solem <a...@opera.com> wrote:
> On Dec 29, 2010, at 1:37 AM, ppearcy wrote:
>
>
>
>
>
>
>
>
>
> > Hello,
> >  Been playing around with Celery and digging through source code and
> > docs. If someone had a few minutes, I was curious about the
> > implementation details of the ETA behavior on top of RabbitMQ (which
> > does not have any support for ETA).
>
> > It appears that a scheduler is used that checks items in a wait queue
> > (?) and fires once the ETA has passed. Internally, it has a sleep
> > interval and maintains an ETA ordered list of pending items.
>
> > I found some of these details here:
> >http://celeryproject.org/docs/internals/reference/celery.worker.sched...
>
> > and also by taking a look at the change log:
> >http://celeryproject.org/docs/changelog.html
>
> > Specifically, how is an ordered list maintained with RabbitMQ? For
> > instance, for each sleep interval, is the entire wait queue evaluated
> > and then either placed into the ready queue or back onto the wait
> > queue? Or does Celery somehow enforce ordering on top of RabbitMQ? If
> > it enforces ordering, how does it do this?
>
> > Any details or even a pointer to the source code that has this logic
> > would be great.
>
> Hi Paul,
>
> I'll try to describe it in brief... The worker.scheduler module is actually long gone now,
> most of the functionality there is replaced by timer2:https://github.com/ask/timer2
> But timer2 is just the internal mechanism that applies a timed function, I'm assuming what
> you're really interested in is how the messaging part works.
>
> From the comments in celery.worker.consumer:http://ask.github.com/celery/internals/reference/celery.worker.consum...
Reply all
Reply to author
Forward
0 new messages