Create dynamically sequential tasks in Luigi

532 views
Skip to first unread message

Vasco Patarata

unread,
Feb 12, 2021, 2:55:20 PM2/12/21
to Luigi
Hello, everyone,

Very new to Luigi and programming in general. Looking for help in the following issue:

import luigi 
class FileToStaging(ImportToTable): 
    filename = Luigi.Parameter(default = '') 
     #import file from some folder to a staging database 
    def requires(self): 
        return luigi.LocalTarget(self.filename) 
  #truncate table 
  #load the file into staging 

class StgToOfficial(RunQuery): 
    filename = Luigi.Parameter 
# run a process in the database to load data from staging to the final table 
    def requires(self): 
        return FileToStaging(self.filename) 
    # run query class 

LoadFileGroups(luigi.WrapperTask): 
    def requires(self): 
        list_of_files = get_list_of_files_currently_in_folder() # The folder can have an arbitrary number of files inside 
    for file in list_of_files: 
        yield(StgToOfficial(filename = file))

I'm new to Luigi and trying to build an ETL process with the framework.

Imagine I have a process similar to the previous snippet of pseudo code. The process must check a folder and get the list of files inside. Then, one by one, import to staging database and run a process to load the data in staging to the final table.

The problem is that, with the previous solution, all the files loading into the staging table (followed by the loading process of each file) is run in parallel, which cannot happen. How can I force Luigi to execute the tasks sequentially? Only when a file finishes the load in the final table, import the next one and so on. (Check the draft below for a simplified draft).

luigi_sequential_tasks.png

I know that I should use the requires method to ensure the sequence, but how can I do it dinamically for an unknown number of files to be loaded?

Thank you very much in advance for the help.

Message has been deleted

Vasco Patarata

unread,
Feb 12, 2021, 3:04:26 PM2/12/21
to Luigi
Adding an extra piece of information that might be useful to understand the challenge. All those files are imported into the same staging table and loaded in the same final table, that's why they should not run in parallel.

Peter Weissbrod

unread,
Feb 12, 2021, 4:34:28 PM2/12/21
to Luigi
Here is a snippet which isnt precisely tailored to your needs but at the same time you might find useful.

Consider a "sql" directory like this:
  ~ :>ll somedir/sql
-rw-rw-r-- 1 pweissbrod authenticatedusers   513 Jan 27 09:15 stage01_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers  1787 Jan 28 13:57 stage02_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers  1188 Jan 28 13:57 stage03_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers 13364 Jan 29 07:16 stage04_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers  1344 Jan 28 13:57 stage05_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers  1983 Jan 28 17:03 stage06_test.sqltemplate
-rw-rw-r-- 1 pweissbrod authenticatedusers  1224 Jan 28 16:05 stage07_test.sqltemplate

Consider a luigi task with a dynamic requires method like this:
class BuildTableTask(luigi.Task):
    table = luigi.Parameter(description='the name of the table this task (re)builds')
    def requires(self):
        tables = [f.split('_')[0] for f in os.listdir('sql') if re.match(f'stage[0-9]+[a-z]*_{config().environment}', f)]
        prereq = next(iter([t for t in sorted(tables, reverse=True) if t < self.table]), None)
        yield BuildTableTask(table=prereq) or []
   
    def run(self):
        with open(f'sql/{self.table}_{config().environment}.sqltemplate'.format(**config().to_dict())) as sqltemplate:
            sql = sqltemplate.read().format(**config().to_dict())
        db.run(f'create table {config().srcdbname}.{self.table} stored as orc as {sql}')

the task tree is built by observing the files in that directory:
 └─--[BuildTableTask-{'table': 'stage07'} (COMPLETE)]
    └─--[BuildTableTask-{'table': 'stage06'} (COMPLETE)]
       └─--[BuildTableTask-{'table': 'stage05'} (COMPLETE)]
          └─--[BuildTableTask-{'table': 'stage04'} (COMPLETE)]
             └─--[BuildTableTask-{'table': 'stage03'} (COMPLETE)]
                └─--[BuildTableTask-{'table': 'stage02'} (COMPLETE)]
                   └─--[BuildTableTask-{'table': 'stage01'} (COMPLETE)]

Lars Albertsson

unread,
Feb 12, 2021, 5:39:41 PM2/12/21
to Peter Weissbrod, Luigi
Unless there is a real data dependency between the different instances of FileToStaging and StgToOfficial, but you want to run them in serial, I would suggest not creating tasks for it. Just make a single task, and in the run() method, list the files and process them in a loop.

If the tasks are truly independent, why not process them in parallel?

BTW, requires() should not return a LocalTarget. It needs to return a Task. If you want to depend on a file or other target, wrap it in an ExternalTask.

Regards,


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/luigi-user/94364e6a-f29e-46a4-8431-d78af78903d5n%40googlegroups.com.

Vasco Patarata

unread,
Feb 12, 2021, 6:34:57 PM2/12/21
to Luigi
Thank you very much, Peter for your help. It seems a very promising approach, I will try it later and then I'll give you feedback. :)

Vasco Patarata

unread,
Feb 12, 2021, 6:35:18 PM2/12/21
to Luigi
Dear, Lars,

Thank you very much for your insights.

There is a data dependency since all files are loaded into the same staging and loaded into the same final table.
  • Imagine that I have a sales table with a day-product key. 
  • Consider I have two sales files to process: sales_2021_02_12 and  sales_2021_02_11. 
  • The files may have sales from various previous days and even corrections in previous values. 
  • I load each file into a staging table and then upsert into a final table. 
  • I have to respect the order since the latest file might have a different value for the same day-product key as the previous.
  • If I do it in parallel it is a complete chaos since I can get data from two different files in the staging at the same time and then when upserting in the final table have two different lines for the same day.
As your last point, actually my code does not have the Target in the requires method. It was a mistake when i wrote this simplified version to address the question. But thanks for the remark, it will be a useful thought while I am still getting into the framework. :)

Best regards,
Vasco

talkt...@gmail.com

unread,
Feb 13, 2021, 8:12:57 AM2/13/21
to Lars Albertsson, Luigi
Hey Lars that's a good point regarding targets. The thing is I'm using a customized version of Luigi that uses the task history table in that SQL lite database as targets.

The other year I took a deep dive comparison Luigi versus airflow. While Luigi still came out the pick of the litter for several reasons airflow had some really nice features like a central task history database.

I've ported this into Luigi and posed this as an RFC for the Luigi team to review. My proposal sort of got pocket vetoed by the community but I continue to use it for my own personal purposes in the ballpark of 80 unique workflows so far.


From: Lars Albertsson <la...@scling.com>
Sent: Friday, February 12, 2021 5:39 PM
To: Peter Weissbrod
Cc: Luigi
Subject: Re: Create dynamically sequential tasks in Luigi

Vasco Patarata

unread,
Feb 13, 2021, 2:48:21 PM2/13/21
to Luigi
Dear, Peter,

How do you initialize the task? What do you send as table parameter in the first run?

Vasco Patarata

unread,
Feb 13, 2021, 3:02:53 PM2/13/21
to Luigi
And does this pattern is still usable if you have to require another task in BuildTaskTable()? Imagine that you need to do another procedure before running BuildTaskTable for each file in the directory.

Something like this tree:
└─--[BuildTableTask-{'table': 'stage07'} (COMPLETE)]
   └─--[AnotherDependentTask-{'table': 'stage07'} (COMPLETE)]

       └─--[BuildTableTask-{'table': 'stage06'} (COMPLETE)]
          └─--[AnotherDependentTask-{'table': 'stage06'} (COMPLETE)]

             └─--[BuildTableTask-{'table': 'stage05'} (COMPLETE)]
                └─--[AnotherDependentTask-{'table': 'stage05'} (COMPLETE)]

                   └─--[BuildTableTask-{'table': 'stage04'} (COMPLETE)]
                      └─--[AnotherDependentTask-{'table': 'stage04'} (COMPLETE)]
                                ... and so on

Thank you very much for your help. :)

Lars Albertsson

unread,
Feb 13, 2021, 6:41:08 PM2/13/21
to Vasco Patarata, Luigi
Ah, I see. I don't understand your staging area, but it seems that you have a recursive data dependency pattern with the sales dataset. It is a common pattern, and easy to express.

If you generate a dataset per day, your tasks should have a DateParameter. Your task then requires the same dataset for the previous day. Example:

class Transactions(Task):
  date = DateParameter()
  ...

class Balances(Task):
  date = DateParameter()

  def requires(self):
    return { "transactions": Transactions(date=self.date), "yesterday": Balances(date=self.date - timedelta(days=1)) }

  def output(self):
    return LocalTarget(f"/shared_mount/balances/balance_{self.date:%Y_%m_%d}.avro")


All data dependencies should be expressed via requires() in your DAG. If you have non-data constraints, e.g. limiting the load on a database, you should find other ways. Luigi provides a feature called 'resources' to solve the load limiting. If you have a unique resource that you must protect, resources might be useful. 

talkt...@gmail.com

unread,
Feb 14, 2021, 9:49:05 AM2/14/21
to Vasco Patarata, Luigi
Look at the requires method. Do you see how it returns an empty array for the first task? You can replace that empty array with precursor tasks.

From: Vasco Patarata
Sent: Saturday, February 13, 2021 3:02 PM
To: Luigi
Subject: Re: Create dynamically sequential tasks in Luigi
--
You received this message because you are subscribed to a topic in the Google Groups "Luigi" group.
To unsubscribe from this group and all its topics, send an email to luigi-user+...@googlegroups.com.

talkt...@gmail.com

unread,
Feb 14, 2021, 9:50:17 AM2/14/21
to Vasco Patarata, Luigi
There are a few ways you can initialize the task. You could specify a table name knowing what the final stage is. Or in the requires method of a calling task you can dynamically figure it out. There may be other ways too

From: Vasco Patarata
Sent: Saturday, February 13, 2021 2:48 PM
To: Luigi
Subject: Re: Create dynamically sequential tasks in Luigi
--
You received this message because you are subscribed to a topic in the Google Groups "Luigi" group.
To unsubscribe from this group and all its topics, send an email to luigi-user+...@googlegroups.com.

Vasco Patarata

unread,
Feb 16, 2021, 9:50:08 AM2/16/21
to Luigi
Hi, Peter,

I struggled to implement the dependency of the other dependent task in the requires method with the "or" clause. However, I was able to do it in the run() method, keeping the recursive pattern of the task in the requires method. It seems to work like a charm. :)

The recursive pattern was really helpful and opens some doors to explore other interesting approaches.  Thank you very much for your help, it was enlightening.

Best wishes,
Vasco

pete

unread,
Feb 20, 2021, 9:08:33 AM2/20/21
to Vasco Patarata, Luigi
Vasco
Sorry for the late reply. Glad I could help. This is something I've been trying to tell folks time and time again but it seems to be hard to summarize. The best code in luigi is WHATS NOT WRITTEN, giving you the freedom to take it wherever you want to go with it. As someone who's worked with alternatives like airflow I consider this a subtle but important differentiator, but it usually falls on deaf ears. Anyways best of luck with your project.



--

Lars Albertsson

unread,
Feb 20, 2021, 12:32:28 PM2/20/21
to pete, Vasco Patarata, Luigi
+1 to the simplicity being Luigi's major strength. Less is more. There are other features that would be nice, e.g. lineage, monitoring, etc, but they are better implemented as separate components. Unix philosophy FTW.

One of the strengths and also weaknesses of Airflow is that it is more opinionated, which is limiting, but it also pushes users into good patterns. I think we could compensate by collecting common recipes and patterns. The recursive dependency discussed in this thread is one example, and there are many of these: demultiplexing, dynamic requirements, time shuffling, etc. Experienced users know many of these, but I keep teaching them over and over, so it would be valuable for the community if those of us that have some patterns to contribute could collect them, e.g. in an examples directory. 

I know, I could stop mailing and start writing a PR... We have a luigi_extensions.py file that has been lying around for long, waiting for me to turn its contents into PRs. Ping me if you want raw but working code to add tests for and contribute to Luigi.


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/luigi-user/CAL7zZVQYAOK8gL0SN0wVNwTOdgXOR5emJehwg6pfc3X6b8LryA%40mail.gmail.com.

pete

unread,
Feb 20, 2021, 1:06:55 PM2/20/21
to Lars Albertsson, Vasco Patarata, Luigi
Lars
This is a topic I'm perennially interested in.  At this point I'm sitting on near 100 actual unique workflows, all with common extensions as well. We're always open to strengthening our conventions.

Did you ever come across issue #2933?
Related to your initiative if not precisely.

Lars Albertsson

unread,
Feb 21, 2021, 7:52:22 AM2/21/21
to pete, Vasco Patarata, Luigi
I hadn't noticed that issue. It makes sense to me, with some reservations. :-)  I don't want to derail this thread, so I'll add my comments on the issue.

Vasco Patarata

unread,
Feb 22, 2021, 5:50:33 AM2/22/21
to Luigi

Yap, I'm starting to get the feel of it. 

The problem is that, for those who come from a non-programming background and used to more "friendly" ETL tools like SSIS, it is kinda hard to get started with. Especially because there is not so much information available on patterns, good practices, starting guides, etc.

The documentation is very technical and it is really needed to dive into the code to understand what is happening beyond the hood of Luigi.

But the flexibility you get with it along with the base logic, which is very clever, seems really amazing once you get more into it.

Vasco Patarata

unread,
Feb 22, 2021, 5:58:04 AM2/22/21
to Luigi
Dear Lars,

That collection of frequent patterns, data pipeline examples, Luigi good practices, etc. seems precisely the piece that is missing to make Luigi more accessible to a broader audience. As a Luigi fresh starter, I think that would have an immense value for everyone who is trying to use it for the first time.

Lars Albertsson

unread,
Feb 24, 2021, 3:03:59 AM2/24/21
to Vasco Patarata, Luigi
I agree. At the moment and for the foreseeable future, I lack the time to set up a documentation catalogue with examples. I run a small data engineering startup without VC money, so I need to spend my time very wisely. 

If someone would like to set up an empty skeleton box with a first example and make it look decent in the documentation, I have a handful of patterns that I can contribute. I cannot make promises on spending the time to do it soon, but I have things to contribute and would like to.

Vasco Patarata

unread,
Feb 24, 2021, 2:38:57 PM2/24/21
to Luigi
Peter and Lars,

Don't want to abuse of your help, but I have one more question that is kinda related with this thread. Hope you can help me.

How can you connect sequentially a set of Wrapper Tasks?

Imagine I have something like this:

import luigi

def WrapperTask1(luigi.WrapperTask):
    def requires():
        yield[RunTaskA(ref_date = date) for date in some_list_of_dates]
        # Each one of this tasks can have another required tasks

def WrapperTask2(luigi.WrapperTask):
    def requires():
        yield RunTaskB()
        # Each one of this tasks can have another required tasks

def WrapperTask3(luigi.WrapperTask):
    def requires():
        yield [RunTaskC(product = sku) for sku in some_sku_list]
        # Each one of this tasks can have another required tasks


Imagine I need the WrapperTask1 to finish before I run WrapperTask2, and the WrapperTask2 to finish before I run WrapperTask3. 

Since the "requires" method is already "populated" by the tasks "called" in the WrapperTask how do you specify a dependency relationship between them? I cannot simply add it in the requires list otherwise the tasks in the requires method of the WrapperTask start to run in parallel with the dependent WrapperTask.

I tried something like this:

def WrapperTask3(luigi.WrapperTask):
    def requires():
        yield WrapperTask2()
       
    def run():
        yield [RunTaskC(product = sku) for sku in some_sku_list]
        # Each one of this tasks can have another required tasks

However, this does not work since, by definition of the WrapperTask, if the WrapperTask2() is completed, WrapperTask3 never executes the run method. 
Then, I tried to make something like this to overcome the issue, although it does not seem right:

def WrapperTask3(luigi.WrapperTask):
    is_complete = False
    def requires():
        yield WrapperTask2()
       
    def run():
        yield [RunTaskC(product = sku) for sku in some_sku_list]
        # Each one of this tasks can have another required tasks
        self.is_complete = True

    def complete():
         return self.is_complete

Although this kinda works, it throws an error in the final validation of the WrapperTask completion. What is the best pattern to address this question?

Thank you very much for your precious help.

Wish you the best of lucks for your startup project, Lars! :)

Regards,
Vasco

pete

unread,
Feb 25, 2021, 5:17:14 PM2/25/21
to Vasco Patarata, Luigi
It's been a long day and I lack the mental energy to fully take your problem in, but I get the impression you need parametric prerequisites.
Tasks can be luigi parameters. Say you have a luigi parameter named 'prerequisites' in TaskX. And in the TaskX.requires() method you yield self.prerequisites in addition to some tasks which TaskX always requires.
Then you have the capability to define multiple instances of TaskX with varying prerequisites. Is that pattern resonating with you?

Vasco Patarata

unread,
Feb 28, 2021, 5:08:11 PM2/28/21
to Luigi
Dear, Peter,

Thank you very much for your kind help, even in a long day.

That remark you pointed of the tasks as parameters is interesting and might be helpful to address some other questions. However, I do not see how can it help in this particular problem. Here is a schematic vision of what I want to achieve.

sequential_wrapper_tasks.PNG

I want each WrapperTask to run sequentially, assuring that the bottom tasks only start to execute when the required WrapperTask is complete. What I was saying is that if I put the WrapperTask1 and the bottom tasks in the requires method of WrapperTask2, they start to run in parallel. 

This is what happens! Not the pretended behaviour!
sequential_wrapper_tasks_2.PNG

This is the intended behaviour:
sequential_wrapper_tasks_3.PNG

Do I have to explicitly put the WrapperTask1 in the bottom tasks of the tree in WrapperTask2 to guarantee that they only start to run when WrapperTask1 ends? Is there no way of doing this in a more elegant fashion?

Kind regards,
Vasco

Lars Albertsson

unread,
Mar 1, 2021, 6:41:18 PM3/1/21
to Vasco Patarata, Luigi
Yes, you have to specify WrapperTask1 as a dependency for all tasks under WrapperTask2. If WrapperTask1 for some reason needs to run first, there is a dependency, and you need to inform Luigi.

Your dependency tree is a bit unusual. Are you sure there is a real data dependency from every phase 1 task to every phase 2 task?  It is usually advisable to express the actual data dependencies at a fine-grained level.

If you share a bit more concrete details, we might be able to help.
Reply all
Reply to author
Forward
0 new messages