Would something like this help?
import luigi
class Sum(luigi.Task):
n = luigi.parameter.IntParameter()
def requires(self):
return [Square(n) for n in range(self.n + 1)]
def run(self):
with open(self.output().path, "w") as f:
print(sum([x.square for x in self.requires()]), file=f)
def output(self):
return luigi.LocalTarget("sum")
class Square(luigi.Task):
i = luigi.parameter.IntParameter()
ran = False
def complete(self):
return self.ran
def run(self):
self.square = self.i * self.i
self.ran = True
In your case, an analog to the Square class could read a file and store the dataframe in an instance variable, like self.square here. Then the task that require()'d a collection of Square-analog classes could access the instance variables and combine the results.
Demo:
% PYTHONPATH=$PWD luigi --module demo Sum --n 3 --local-scheduler
DEBUG: Checking if Sum(n=3) is complete
DEBUG: Checking if Square(i=0) is complete
DEBUG: Checking if Square(i=1) is complete
DEBUG: Checking if Square(i=2) is complete
DEBUG: Checking if Square(i=3) is complete
INFO: Informed scheduler that task Sum_3_ab99dd1074 has status PENDING
INFO: Informed scheduler that task Square_3_5d0c77e0b0 has status PENDING
INFO: Informed scheduler that task Square_2_a9facda7c5 has status PENDING
INFO: Informed scheduler that task Square_1_64a8d99e05 has status PENDING
INFO: Informed scheduler that task Square_0_d010c9c495 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 5
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running Square(i=2)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) done Square(i=2)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Square_2_a9facda7c5 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running Square(i=3)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) done Square(i=3)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Square_3_5d0c77e0b0 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running Square(i=0)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) done Square(i=0)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Square_0_d010c9c495 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running Square(i=1)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) done Square(i=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Square_1_64a8d99e05 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running Sum(n=3)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) done Sum(n=3)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Sum_3_ab99dd1074 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 4 Square(i=0...3)
- 1 Sum(n=3)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
% cat sum
14