Callback task state propagation

116 views
Skip to first unread message

Praveen Gollakota

unread,
Mar 22, 2012, 7:39:14 AM3/22/12
to celery...@googlegroups.com
Hello!

I have a task which calls a subtask which calls another subtask and so on. I would like to be able to query the main task and find out which sub task is currently running.

Let me make it clear with an example. This is the example straight from Celery documentation.
@task(ignore_result=True)
def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    fetch_page.delay(url, callback=subtask(parse_page,
                                callback=subtask(store_page_info)))

@task(ignore_result=True)
def fetch_page(url, callback=None):
    page = myhttplib.get(url)
    if callback:
        # The callback may have been serialized with JSON,
        # so best practice is to convert the subtask dict back
        # into a subtask object.
        subtask(callback).delay(url, page)

@task(ignore_result=True)
def parse_page(url, page, callback=None):
    info = myparser.parse_document(page)
    if callback:
        subtask(callback).delay(url, info)

@task(ignore_result=True)
def store_page_info(url, info):
    PageInfo.objects.create(url, info)

Now when I call the update_page_info task, and check its state, I would like to be able to get custom states like FETCHING, PARSING, STORING, etc. depending on which subtask is processing currently. I tried looking up in multiple places, but it was not clear how to accomplish this. 

Any help is much appreciated!

Thanks!
Praveen.

Ask Solem

unread,
Mar 22, 2012, 12:39:09 PM3/22/12
to celery...@googlegroups.com

On 22 Mar 2012, at 11:39, Praveen Gollakota wrote:

> Hello!
>
> I have a task which calls a subtask which calls another subtask and so on. I would like to be able to query the main task and find out which sub task is currently running.
>
> Let me make it clear with an example. This is the example straight from Celery documentation.
>

> Now when I call the update_page_info task, and check its state, I would like to be able to get custom states like FETCHING, PARSING, STORING, etc. depending on which subtask is processing currently. I tried looking up in multiple places, but it was not clear how to accomplish this.

You should have the tasks return the AsyncResult/TaskSetResult of the subtasks they start,
that way you can traverse the subsequent tasks.

One example is AsyncResult.collect, which is a new method only available in the development
version: https://github.com/ask/celery/blob/master/celery/result.py#L98-140

This will also work with the stable and older versions.

--
Ask Solem
twitter.com/asksol | +44 (0)7713357179

signature.asc
Reply all
Reply to author
Forward
0 new messages