I have code which happens to interact with sql server via pymssql but it is insert code and isnt substantially helpful for your case. But I could suggest:
It seems all you really need are two tasks (I'll call then LoadDataDictionary, Backfill but you can call them whatever):
(pseudocode):
class LoadDataDictionary(luigi.Task):
def run(self):
mydict = downloadfromdb()
with open(JSONFILE) as f:
json.dump(mydict, f)
def output(self):
return luigi.LocalTarget(JSONFILE)
class Backfill(luigi.Target):
target = luigi.Parameter(description='the location of a 3rd source')
def requires(self):
yield LoadDataDictionary()
def run(self):
with open(JSONFILE) as f:
mydict = json.load(f)
for target in mydict.keys():
data = loadfromtarget(target)
backfill(data)
But it's likely more robust/flexible to perform each time series backfill as an individual task, because we can better control parallelism and allow for failure/retry on individual backfills. If that's true you could change Backfill to work on a single target:
class Backfill(luigi.Target):
backfillsource = luigi.Parameter(description='the location of a 3rd source')
def run(self):
data = loadfromtarget(self.
backfillsource)
backfill(data)
def output(self):
# fill in some luigi target to represent the output. this could be a local file marker or something in the db
You could replace LoadDataDictionary with a wrapper task:
class Main(luigi.WrapperTask):
def requires(self):
mydict = downloadfromdb()
for key in mydict.keys():
yield Backfill(backfillsource=key)
Or you could keep LoadDataDictionary as a luigi Task and introduce a new WrapperTask which properly requires LoadDataDictionary first, reads the output of that task and generates Backfill tasks accordingly.
I know much of this is general/high-level but I hope it's enough to give you traction.