A task is marked as Pending even though all it's dependencies are Done

788 views
Skip to first unread message

Ran Tavory

unread,
Feb 20, 2014, 3:06:49 AM2/20/14
to luigi...@googlegroups.com
Hi, do you know what might be the explanation to this? 
http://cl.ly/image/0o3K2P1t1u3C

The task TriggerDaily is very short - it just figures out which accounts need to run their daily tasks and triggers them. (Triggers the AllDailyTask with the account ID parameter). 

In most cases this works OK and when all dependent tasks are done, so the TriggerDaily task is marked as done as well. 
However, I've noticed that in some cases, even when all dependent tasks are done, the triggering task isn't marked as Done and is still marked as pending. I'm not sure what might be causing it. This doesn't go away with a page refresh or anything (e.g. it isn't just a transitional state for a split second). It stays like this - I don't know until when, probably until some time out, but very long. 

This does not seem to affect the correct execution of tasks and does not trigger error emails or such, but it does add clutter.  

--

Erik Bernhardsson

unread,
Feb 20, 2014, 10:11:05 AM2/20/14
to Ran Tavory, luigi...@googlegroups.com
Without really thinking through it, just throwing out a hypothesis: if the last task doesn't have an output() or complete() method implemented then maybe that's why it's not marked as done


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Erik Bernhardsson
Engineering Manager, Spotify, New York

Ran Tavory

unread,
Feb 20, 2014, 2:02:29 PM2/20/14
to Erik Bernhardsson, luigi...@googlegroups.com
Hi Erik,
In most cases the same task does get marked as completed. But in some cases it doesn't. 
Here's how it looks more or less:

class TriggerDaily(luigi.Task):
  did_run = False

  def requires(self):
    # Do things... and yield deps
    self.did_run = True

  def complete(self):
    return self.did_run


Do you think it might help if I changed to add a run method, something like this?

class TriggerDaily(luigi.Task):
  did_run = False

  def requires(self):
    # Do things... and yield deps

  def run(self):
    self.did_run = True

  def complete(self):
    return self.did_run



Joe Crobak

unread,
Feb 20, 2014, 2:56:46 PM2/20/14
to Ran Tavory, Erik Bernhardsson, luigi...@googlegroups.com
I suspect that the second task would work better, but you should probably be using WrapperTask based upon the code you've posted. In that case, you just have to def `requires`.


Although it depends what you're doing in `requires` if that's non deterministic, it could confuse the scheduler.

Erik Bernhardsson

unread,
Feb 20, 2014, 3:05:26 PM2/20/14
to Ran Tavory, luigi...@googlegroups.com
I'm not sure if it matters... maybe the problem is you run it with multiple workers and the complete() method isn't synced across processes (like it would if you used any file system based target).

Would be great to isolate the perpetrator and add a unit test for this


On Thu, Feb 20, 2014 at 2:02 PM, Ran Tavory <ran...@gmail.com> wrote:

Ran Tavory

unread,
Feb 21, 2014, 3:50:03 AM2/21/14
to Erik Bernhardsson, luigi...@googlegroups.com
ok, it's probably a combination of a few things. 
first is that the depends() method is time based so indeed it might be that it returns different values if called multiple times. So I think the right way to go is insert this logic into the constructor and by that run it only once. It looks at the current time and then queries the database to find account IDs for which it is midnight now, so it might return different values when called after an hour has passed.
second - the multi process reason might also stand. So I'll use a local file target to signal the exists/complete of this task.

A question: how do you implement a Task constructor?
I have not been able to find the correct formula... I know it's a silly question, but whatever I tried didn't work.

Tried different things such as this, but I fail b/c of luigi's parameters, which I'm not sure how to pass to the super constructor. I guess there's some magic I hadn't been able to follow...

def __init__(self, *args, **kwargs):
  super(Me, self).__init__(args, kwargs) 


Thanks!

Erik Bernhardsson

unread,
Feb 21, 2014, 9:40:47 AM2/21/14
to Ran Tavory, luigi...@googlegroups.com
On Fri, Feb 21, 2014 at 3:50 AM, Ran Tavory <ran...@gmail.com> wrote:
ok, it's probably a combination of a few things. 
first is that the depends() method is time based so indeed it might be that it returns different values if called multiple times. So I think the right way to go is insert this logic into the constructor and by that run it only once. It looks at the current time and then queries the database to find account IDs for which it is midnight now, so it might return different values when called after an hour has passed.

OK – I think it might make more sense if the task's requires doesn't change. You can still have default parameters that are dynamic

class MyTask(luigi.Task):
    date = luigi.DateParameter(default=datetime.date.today())
    # ...
 
second - the multi process reason might also stand. So I'll use a local file target to signal the exists/complete of this task.

A question: how do you implement a Task constructor?
I have not been able to find the correct formula... I know it's a silly question, but whatever I tried didn't work.

Tried different things such as this, but I fail b/c of luigi's parameters, which I'm not sure how to pass to the super constructor. I guess there's some magic I hadn't been able to follow...

def __init__(self, *args, **kwargs):
  super(Me, self).__init__(args, kwargs) 


This is how you do it in Python (kind of ugly btw):

def __init__(self, *args, **kwargs):
  super(Me, self).__init__(*args, **kwargs) 

Ran Tavory

unread,
Feb 23, 2014, 4:17:33 AM2/23/14
to Erik Bernhardsson, luigi...@googlegroups.com
Hi, so I fixed according to your suggestions but I still see cases where all dependent tasks are marked as Done but the parent task is still pending. 


Here's what I did:

1. Add a constructor to the TriggerDaily task and in this c'tor run the time based query to the database and store the result in a self.requirements (an array). Then in the requires() method, just return this array. So that should take care of the non-deterministic return value of requires(), I assume.

2. Instead of using an in memory member variable (self.did_run) I use a local file system target and simply write "Done" to that target. This works well in some cases, but again - in some case the task isn't Done even though all its dependencies are Done and in those cases, the file system target doesn't get written. 

Where else should I look?

I don't mind sharing the code - not that much of secrets there, I just hope it doesn't add clutter https://gist.github.com/rantav/22e682946f5e57a6cd59

Thanks


Erik Bernhardsson

unread,
Feb 23, 2014, 12:51:02 PM2/23/14
to Ran Tavory, luigi...@googlegroups.com
Sorry – no idea what's going on here. Would be awesome if you could isolate it somehow.

I know in some rare cases if the worker hits a high CPU usage, the keep-alive ping to the scheduler will fail, and the scheduler will start ignoring later calls. Haven't seen that bug in a while though. Just mentioning it

Ran Tavory

unread,
Mar 3, 2014, 4:03:01 AM3/3/14
to Erik Bernhardsson, luigi...@googlegroups.com
I have a theory now - let's see if I'm correct...

A task may be marked as pending indefinitely in the following scenario:
 - Task runs once and then fails. Maybe even twice, and fails twice. etc.
 - The worker never reschedules the task again. Or it did reschedule but it failed again. 
 - In scheduler.py line 145 a failed task is marked as pending if retry_delay > 0 (the default is 900s) and it's retry time is still in the future. 

So the result is that the task remains pending - regardless of whether it has dependencies that are now complete or does not have dependencies at all. 

Is this correct?

What's gotten me confused is that I assumed there's automated retry. I've found the parameter retry_delay which belongs to the server so I assumed this somehow magically also takes care worker retries. 
What I now realize (hope it's correct) is that the retry mechanism has two sides. One is the server side which means "The server *allows* retry only after retry_delay time had passed" but this isn't enough - the worker also needs to *want to retry* by polling the server until it's allowed to run the task again. 
Is this correct?

What's gotten me confused even more (and that's besides the point) is that in my case I did have retries, by cron every 1/2h, but they were limited to up-to 2h from the initial run, so if a task failed and more than 2 hours had passed since it's first run, there will not be a retry. That's why retries worked - in most cases (by cron) but failed in others (where > 2h passed)

If my conclusion is correct then the right thing for me to do is keep retrying up to 24h (business logic). 

What I am missing is a max_retry parameter. Is there such? I don't want my tasks to continue trying to run and fail for more than, say, 3 times. If I just stop trying (from the worker) then the failed task would seem pending and that's confusing. I want it to be failed and not pending.  Maybe I could get that by setting retry_delay = 0, but is there a say to combine retry_delay > 0 and max_attempts?

Elias Freider

unread,
Mar 3, 2014, 5:15:52 AM3/3/14
to Ran Tavory, Erik Bernhardsson, luigi...@googlegroups.com
See inline responses:

On Mon, Mar 3, 2014 at 10:03 AM, Ran Tavory <ran...@gmail.com> wrote:
I have a theory now - let's see if I'm correct...

A task may be marked as pending indefinitely in the following scenario:
 - Task runs once and then fails. Maybe even twice, and fails twice. etc.
 - The worker never reschedules the task again. Or it did reschedule but it failed again. 
 - In scheduler.py line 145 a failed task is marked as pending if retry_delay > 0 (the default is 900s) and it's retry time is still in the future. 

So the result is that the task remains pending - regardless of whether it has dependencies that are now complete or does not have dependencies at all. 

Is this correct?
 
It's correct. A task that is marked as failed will be marked as pending again automatically after retry_delay seconds (the retry variable for the task in the scheduler is set to current time + retry_delay when it fails). It won't immediately be set to pending though if that's what you mean.
The idea behind this was to avoid having a failing task being repeatedly executed by the same worker, but still letting it be re-tried every once in a while. Something more sophisticated with some kind of increasing backoff would be nice to have but we haven't prioritized it.


What's gotten me confused is that I assumed there's automated retry. I've found the parameter retry_delay which belongs to the server so I assumed this somehow magically also takes care worker retries. 
What I now realize (hope it's correct) is that the retry mechanism has two sides. One is the server side which means "The server *allows* retry only after retry_delay time had passed" but this isn't enough - the worker also needs to *want to retry* by polling the server until it's allowed to run the task again. 
Is this correct?

That's correct. The retry_delay in the scheduler is simply an run-allowance/release of the lock for the task. A worker with the task in it's dependency chain still needs to ask for it to actually trigger a run.
 
 
What's gotten me confused even more (and that's besides the point) is that in my case I did have retries, by cron every 1/2h, but they were limited to up-to 2h from the initial run, so if a task failed and more than 2 hours had passed since it's first run, there will not be a retry. That's why retries worked - in most cases (by cron) but failed in others (where > 2h passed)

If my conclusion is correct then the right thing for me to do is keep retrying up to 24h (business logic). 

What I am missing is a max_retry parameter. Is there such? I don't want my tasks to continue trying to run and fail for more than, say, 3 times. If I just stop trying (from the worker) then the failed task would seem pending and that's confusing. I want it to be failed and not pending.  Maybe I could get that by setting retry_delay = 0, but is there a say to combine retry_delay > 0 and max_attempts?
 
The problem with max_attempts would be invalidation. If you constantly have some worker online trying to perform the task, it will never be pruned from the scheduler graph and hence never re-ran, even after a code fix. We would need some mechanism to let it re-run when whatever problem was failing it has been fixed.

Ran Tavory

unread,
Mar 3, 2014, 5:25:02 AM3/3/14
to Elias Freider, Erik Bernhardsson, luigi...@googlegroups.com
Thanks Erik. 
So is there a best practice - what would you suggest doing in order to show that this task has failed, but potentially rerun it if there's a worker that's asking for it?
Right not the UI is misleading as it marks the task as pending but it's not obvious why it is pending. 

Elias Freider

unread,
Mar 3, 2014, 5:37:36 AM3/3/14
to Ran Tavory, Erik Bernhardsson, luigi...@googlegroups.com
I think it would be a relatively simple change to the scheduler logic to make the scheduler look for both pending tasks and failed tasks that have timed out in CentralPlannerScheduler.get_work(). Then we wouldn't have to relabel the task as pending to enable re-runs.

Erik: could such a change affect some other behavior that I'm not thinking about?

/Elias
Reply all
Reply to author
Forward
0 new messages