Duplicate tasks - prevent / identify / merge

1856 views
Skip to first unread message

Leo Dirac

unread,
Oct 5, 2011, 1:11:28 PM10/5/11
to celery...@googlegroups.com
Is there a way to prevent a task being run multiple times with the same parameters?  I'm launching tasks to fill a cache when requests come in for certain data.  If two people request the same data item in quick succession -- faster than the task calculates it and stores it in the cache -- then two tasks will be triggered to calculate the value.  This is wasteful.  I know gearman can identify these tasks as identical and cause the same result to go to both of them.  How could I do this with celery?  I'd be sufficiently happy just not to run the second copy of the task and deal with merging the results myself.

--
Leo Dirac
Banyan Branch | Director of Technology


Juarez Bochi

unread,
Oct 20, 2011, 8:58:55 AM10/20/11
to celery...@googlegroups.com
Hello,

I have a very similar situation on my project but I could not find a solution yet.

Google App Engine's Task Queue has an interesting concept of Task Names: http://code.google.com/appengine/docs/python/taskqueue/overview.html#Task_Names

"In addition to a task's contents, you can declare a task's name. Once a task with name N is written, any subsequent attempts to insert a task named N fail"

It would be very interesting if Celery had this feature too.

Cheers
Juarez

Kayode Odeyemi

unread,
Oct 25, 2011, 3:24:13 PM10/25/11
to celery...@googlegroups.com
On Thu, Oct 20, 2011 at 1:58 PM, Juarez Bochi <jbo...@gmail.com> wrote:
Hello,

I have a very similar situation on my project but I could not find a solution yet.

Google App Engine's Task Queue has an interesting concept of Task Names: http://code.google.com/appengine/docs/python/taskqueue/overview.html#Task_Names

"In addition to a task's contents, you can declare a task's name. Once a task with name N is written, any subsequent attempts to insert a task named N fail"

I just posted the same question myself. Didn't see this.

Hoping to see this feature too.

--
Odeyemi 'Kayode O.
http://www.sinati.com. t: @charyorde

Juarez Bochi

unread,
Oct 28, 2011, 10:00:20 AM10/28/11
to celery...@googlegroups.com
I'm willing to implement this feature, but I need some guidelines.

What's the best possible approach? To generate a task UUID using its name and the provided arguments?

Cheers,
Juarez

Paulo Cheque

unread,
Oct 28, 2011, 10:36:52 AM10/28/11
to celery...@googlegroups.com
maybe celery.app.amqp.TaskPublisher.delay_task?
maybe celery.app.__init__.App.task?

This feature would be excelent. More ideas?

Ask Solem

unread,
Oct 28, 2011, 11:55:39 AM10/28/11
to celery...@googlegroups.com

On 28 Oct 2011, at 15:00, Juarez Bochi wrote:

> I'm willing to implement this feature, but I need some guidelines.
>
> What's the best possible approach? To generate a task UUID using its name and the provided arguments?
>

I agree, it is a great feature, but it's not that easy to accomplish.

If you want to implement something like this:

if not already_sent():
task.apply_async()

then that won't work very well.

AMQP does not support out of order operations to the queue, and for good reason.
The task may be in the process of being sent, by a client, or by another broker
across the continents, or the task has been taken out of the queue, just not executed
yet.


The solution you want is for the task itself to test if it's already running:

@task
def x():
if already_running():
return


But this requires a distributed mutex, which is easily done using redis (or memcached),
but Celery can't depend on these, so that's why it's not in Celery already.


I have been meaning to add support for it, and my idea has been something like this:


We could have a decorator called @synchronized:

@task
@synchronized()
def mytask():
pass


Implemented like this:

from celery.task import task
from functools import wraps


def synchronized(backend=None):
_backend = [backend]

def _inner(task):
_backend[0] = _backend[0] or task.backend
orig = task.run

@wraps(task.run)
def _syn(*args, **kwargs):
_backend[0].acquire_synlock(task, *args, **kwargs)
try:
return orig(*args, **kwargs)
finally:
_backend[0].release_synlock(task, *args, **kwargs)

task.run = _syn
return task

return _inner


So then any result backend to support the synchronised
decorator must implement the methods acquire_synlock() + release_synlock()

An example implementation of a memcached lock is here:
http://docs.celeryproject.org/en/latest/cookbook/tasks.html#ensuring-a-task-is-only-executed-one-at-a-time

If you would like to implement this I can help with any further
questions you may have!

--
Ask Solem
twitter.com/asksol | +44 (0)7713357179

Paulo Cheque

unread,
Oct 28, 2011, 12:29:42 PM10/28/11
to celery...@googlegroups.com
=== Comment 1:

There is this implementation of a @synchronized too:
https://github.com/gthb/celery/commit/407b53f128b9954a2dc0983d7c396dc751117c33

Example:
@synchronized
@task
def my_task(): pass

But I tried here and it did not work properly. I run celeryd with concurrency=2 and start to send replicated tasks. I realized that two identical tasks are being processed at the same time (easy to reproduce with functions that print a value). Any ideas?


=== Comment 2:

What do you think about using "apply_sync" method with ETAs or COUNTDOWNS in every scheduled tasks?



Thanks in advance
Paulo

Ask Solem

unread,
Oct 28, 2011, 6:57:28 PM10/28/11
to celery...@googlegroups.com

On 28 Oct 2011, at 17:29, Paulo Cheque wrote:

> === Comment 1:
>
> There is this implementation of a @synchronized too:
> https://github.com/gthb/celery/commit/407b53f128b9954a2dc0983d7c396dc751117c33
>
> Example:
> @synchronized
> @task
> def my_task(): pass
>
> But I tried here and it did not work properly. I run celeryd with concurrency=2 and start to send replicated tasks. I realized that two identical tasks are being processed at the same time (easy to reproduce with functions that print a value). Any ideas?
>

The problem with this implementation is that it only works
for one worker (i.e. the lock is not effective across celeryd instances)

There is a use for this implementation as well, but it wouldn't be enough
for a generic 'synchronized' implementation.


>
> === Comment 2:
>
> What do you think about using "apply_sync" method with ETAs or COUNTDOWNS in every scheduled tasks?
>
>

I'm not sure I understand what you mean, every scheduled task?

Reply all
Reply to author
Forward
0 new messages