Hello,I have a very similar situation on my project but I could not find a solution yet.Google App Engine's Task Queue has an interesting concept of Task Names: http://code.google.com/appengine/docs/python/taskqueue/overview.html#Task_Names"In addition to a task's contents, you can declare a task's name. Once a task with name N is written, any subsequent attempts to insert a task named N fail"
> I'm willing to implement this feature, but I need some guidelines.
>
> What's the best possible approach? To generate a task UUID using its name and the provided arguments?
>
I agree, it is a great feature, but it's not that easy to accomplish.
If you want to implement something like this:
if not already_sent():
task.apply_async()
then that won't work very well.
AMQP does not support out of order operations to the queue, and for good reason.
The task may be in the process of being sent, by a client, or by another broker
across the continents, or the task has been taken out of the queue, just not executed
yet.
The solution you want is for the task itself to test if it's already running:
@task
def x():
if already_running():
return
But this requires a distributed mutex, which is easily done using redis (or memcached),
but Celery can't depend on these, so that's why it's not in Celery already.
I have been meaning to add support for it, and my idea has been something like this:
We could have a decorator called @synchronized:
@task
@synchronized()
def mytask():
pass
Implemented like this:
from celery.task import task
from functools import wraps
def synchronized(backend=None):
_backend = [backend]
def _inner(task):
_backend[0] = _backend[0] or task.backend
orig = task.run
@wraps(task.run)
def _syn(*args, **kwargs):
_backend[0].acquire_synlock(task, *args, **kwargs)
try:
return orig(*args, **kwargs)
finally:
_backend[0].release_synlock(task, *args, **kwargs)
task.run = _syn
return task
return _inner
So then any result backend to support the synchronised
decorator must implement the methods acquire_synlock() + release_synlock()
An example implementation of a memcached lock is here:
http://docs.celeryproject.org/en/latest/cookbook/tasks.html#ensuring-a-task-is-only-executed-one-at-a-time
If you would like to implement this I can help with any further
questions you may have!
--
Ask Solem
twitter.com/asksol | +44 (0)7713357179
> === Comment 1:
>
> There is this implementation of a @synchronized too:
> https://github.com/gthb/celery/commit/407b53f128b9954a2dc0983d7c396dc751117c33
>
> Example:
> @synchronized
> @task
> def my_task(): pass
>
> But I tried here and it did not work properly. I run celeryd with concurrency=2 and start to send replicated tasks. I realized that two identical tasks are being processed at the same time (easy to reproduce with functions that print a value). Any ideas?
>
The problem with this implementation is that it only works
for one worker (i.e. the lock is not effective across celeryd instances)
There is a use for this implementation as well, but it wouldn't be enough
for a generic 'synchronized' implementation.
>
> === Comment 2:
>
> What do you think about using "apply_sync" method with ETAs or COUNTDOWNS in every scheduled tasks?
>
>
I'm not sure I understand what you mean, every scheduled task?