Successfull task run with Unfulfilled dependency at run time

Skip to first unread message

Niko Strongioglou

Mar 16, 2022, 11:41:57 AM3/16/22
to Luigi
I have the following setup

class RootTask(luigi.WrapperTask):

    def requires(self):
        dependencies = [TaskA(), TaskB()]
        yield dependencies
        yield CreatePartials(dependencies=dependencies)

class TaskA(luigi.Task):
    def run(self):
        # write task_a.json

    def output(self):
        return luigi.LocalTarget('task_a.json')

class TaskB(luigi.Task):
    def run(self):
        # write task_a.json

    def output(self):
        return luigi.LocalTarget('task_b.json')

class CreatePartials(luigi.Task):
    dependencies = luigi.TaskParameter()

    def run(self):
        # write root_task_output.json

    def requires(self):
        for dep in self.dependencies:
            yield dep

    def output(self):
        return luigi.LocalTarget('root_task_output.json')

Although the pipeline is executed successfully, I am getting an `Unfulfilled dependency at run time` exception for each run on the `CreatePartials` task. There's one peculiarity with this processing flow: TaskA and TaskB have quite different completion times - meaning that `TaskA` could finish 8 hours later than `TaskB`. The fact that the data results are correct makes me confident that the code works - somehow the `CreatePartials` is retried once, it finds both dependencies completed and proceeds with its `run` method. Nevertheless, I am getting these `Unfulfilled dependencies` exceptions which
1. Create logs/alerts noise
2. I am not sure if they are indicative of a code issue which I am not aware at the moment since the results are correct. And that issue could manifest in some edge I cannot see currently.

So why am I getting these `Unfulfilled dependency` then?

Thanks a lot.
Reply all
Reply to author
0 new messages