chain subtask raise exception to captured

237 views
Skip to first unread message

arco

unread,
Nov 29, 2019, 5:56:56 AM11/29/19
to celery-users
hi

i currently have the current setup 

a main_task has a chain of subtasks

the desired functionality is once a subtask is complete it's progress is logged
if a subtask fails the exception is logged and the chain is failed 

to only way achieve this is by using link_error on subtasks 

@shared_task
def main_task ():

   
# each subtask links to an update task to update the chain progress
    sub1
= task1.si()
    sub1
.link(update_progress.si())


    jobs
= chain(sub1, sub2...)
    jobs
.link(success_callback.si())
    jobs
.link_error(failure_callback.si())




if sub1 raises an exception it won't be caught by failure_callback unless i do
    
sub1.link_error(sub_error.si())

and
@shared_task
def task1():

   
# check for error
   
if(True):
         
raise Exception()



in example: if an exception is raise an from task2 when I run this setup the order is
main_task 
task1
update_progress
task2
main_task success
task1 success
update_progress success
sub_error call
failure_callback call

I would expect that if a chain contains a task that failed then the chain link_error would catch it?

i'm kinda new to python but I think a decorator would be appropriate since I have chains with tens of subtasks 


arco

unread,
Dec 1, 2019, 10:57:23 AM12/1/19
to celery-users
i've found by baseclassing the subtasks this can be achieved more efficiently:

class CallbackTask(Task):
   """subclass celery.Task"""

    def on_success(self, retval, task_id, args, kwargs):
       pass

    def on_failure(self, exc, task_id, args, kwargs, einfo):
       pass

    def update_state(self, task_id, state, meta):
       pass

@shared_task(base=CallbackTask)
def main_task(*args):
    chain(sub_task1.si(), sub_task2.si(), sub_task3.si())()


@shared_task(bind=Truebase=CallbackTask)
def sub_task1(self, *args):
    self.update_state(task_id=self.request.idstate="PROGRESS"meta={'progress'1})
    return True


@shared_task(bind=Truebase=CallbackTask)
def sub_task2(self, *args):
    self.update_state(task_id=self.request.id,
                      state="PROGRESS"meta={'progress'2})
    return True


@shared_task(bind=Truebase=CallbackTask)
def sub_task3(self, *args):
    Ex = ValueError()
    Ex.strerror = "Value must be within 1 and 10."
    raise Ex
Reply all
Reply to author
Forward
0 new messages