luigi ETL + MS SQL tutorial

57 views
Skip to first unread message

Itay Feldman

unread,
Nov 11, 2020, 11:39:23 PM11/11/20
to Luigi

Could somebody direct me to a end-to-end tutorial using luigi and MS SQL (or another RMDBS).

I want to:

1.  download data dictionary from DB
2.  for each datum in data dictionary
         a.  get the latest time series data from a 3rd source
         b.  back fill DB with time series data

It seems straight forward and that luigi is the right tool.  I am just struggling to find a tutorial that covers how to use a RMDBS with luigi.   

Peter Weissbrod

unread,
Nov 13, 2020, 9:01:08 AM11/13/20
to Luigi
I have code which happens to interact with sql server via pymssql but it is insert code and isnt substantially helpful for your case. But I could suggest:
It seems all you really need are two tasks (I'll call then LoadDataDictionary, Backfill but you can call them whatever):

(pseudocode):
 JSONFILE = 'mydata.json'

class LoadDataDictionary(luigi.Task):
    def run(self):
       mydict = downloadfromdb()
       with open(JSONFILE) as f:
           json.dump(mydict, f)
    def output(self):
        return luigi.LocalTarget(JSONFILE)

class Backfill(luigi.Target):
    target = luigi.Parameter(description='the location of a 3rd source')
    def requires(self):
        yield LoadDataDictionary()

    def run(self):
        with open(JSONFILE) as f:
            mydict = json.load(f)
        for target in mydict.keys():
            data = loadfromtarget(target)
            backfill(data)

But it's likely more robust/flexible to perform each time series backfill as an individual task, because we can better control parallelism and allow for failure/retry on individual backfills. If that's true you could change Backfill to work on a single target:

class Backfill(luigi.Target):
    backfillsource = luigi.Parameter(description='the location of a 3rd source')
    def run(self):
        data = loadfromtarget(self. backfillsource)
        backfill(data)
    def output(self):
        # fill in some luigi target to represent the output. this could be a local file marker or something in the db

You could replace LoadDataDictionary with a wrapper task:
  
class Main(luigi.WrapperTask):
    def requires(self):
     mydict = downloadfromdb()
     for key in mydict.keys():
        yield Backfill(backfillsource=key)
           
Or you could keep LoadDataDictionary as a luigi Task and introduce a new WrapperTask which properly requires LoadDataDictionary first, reads the output of that task and generates Backfill tasks accordingly.

I know much of this is general/high-level but I hope it's enough to give you traction.
Reply all
Reply to author
Forward
0 new messages