Ways to force re-runs, and ignore prior dependencies?

487 views
Skip to first unread message

James McMurray

unread,
Sep 28, 2017, 4:58:12 AM9/28/17
to Luigi
Hi,

We commonly run in to the two mentioned issues when deploying ETL bug fixes. For example, if we find a bug in a daily update query, we'd like to be able to force it to run again for today (ignoring and overwriting the current state of its target). At the moment we can do this by manually removing the target prior to execution, but something like a --force flag would be great.

Then, when the fix has been checked, etc. we often want to re-run the last step of a long process for all days. I.e. we want to re-run the final update query across all days, but __not__ re-run the entire importation process. We can't be 100% certain of the state of all of the intermediate targets for all prior days but we know that the data in our Data Warehouse is okay for those days. At the moment we do this by using dummy parameters and inserting the targets for those beforehand, so it doesn't check prior dependencies. But an --ignore-dependencies flag for these (hopefully rare) occasions would be useful.

Has anyone dealt with these cases before? And what are your thoughts?

Best,
James McMurray

adam....@tribalgroup.com

unread,
Sep 29, 2017, 4:30:18 AM9/29/17
to Luigi

 I'd be inclined to keep this entirely separate, to have a procedure to delete the selected output using coded logic, rather than manual process, and to keep the luigi tasks with very clean dependency/run logic. I think this will probably also make it easier to handle cases where a series of task output becomes invalidated by a bug, and its the approach I intend to follow. The implication is, I think, that you need to store the dependency info for successful tasks.

Adam

Noah Yetter

unread,
Nov 8, 2017, 2:49:54 PM11/8/17
to Luigi
I've tried to implement my own force flag on some tasks, and come to the conclusion that it's very difficult to do properly.

The issue is stateful-ness. When you say "force" this task, what you're saying is "disregard the presence of its targets, but only ONCE". Remember that target checking is performed before a task runs, but also after it runs, in the pre-check(s) for any of its dependent tasks. When those happen, you do want the presence check.

This can easily be implemented on your own with a parameter *as long as you don't use multiple workers*. If you have multiple workers, instances of your task classes are being created in different processes, so anything stateful in those classes is lost. If you only have one process, you can be stateful.

In order to implement --force in a way that's compatible with multiple workers, you would need to externalize the bit of state that records "have I skipped the output check yet?". This more-or-less requires a database, so implementing it inside Luigi itself would create a highly non-trivial new dependency. But if you have a database you can use, implementing it yourself is possible.


Here's an outline of the one-worker-only solution:
class SomeTask(luigi.Task):
    # other parameters...
    force = luigi.BooleanParameter(default=False, significant=False)

    has_been_forced = False

    def output(self):
        target = SomeTarget(...)
        if self.force and not self.has_been_forced:
            target.remove()
            self.has_been_forced = True
        return target

Another approach is to build a parallel network of tasks whose job it is to erase the output of your real tasks, in the spirit of "make clean". This is not really as easy as it sounds, and obviously creates a maintenance headache.
Reply all
Reply to author
Forward
0 new messages