Hello everybody,
I'm executing a simple example build by myself to better understand the concept of Luigi, and there're something, I do not understand. I try to simulate the check of a file presence before executing the transform and aggregation tasks. Here it is my source code :
import luigi
class CheckFileOk(luigi.Task):
def run(self):
pass
def output(self):
return luigi.LocalTarget("row_imput_file" + self.param)
class TransformTask(luigi.Task):
param = luigi.Parameter(default=42)
def require(self):
return [CheckFileOk(self.param)]
def run(self):
with self.output().open('w') as out_file:
out_file.write("transform")
def output(self):
return luigi.LocalTarget("row_imput_file_" + self.param + "_transformed")
class AggTask(luigi.Task):
param = luigi.Parameter(default=42)
def require(self):
return [Task1(i) for i in range(0, 5)]
def run(self):
with self.output().open('w') as out_file:
out_file.write("aggregate")
def output(self):
return luigi.LocalTarget("AggTask")
if __name__ == '__main__':
luigi.run()
I have not provide
d row_input_file, and when I execute my code thought the following command line :
python luigi_test.py AggTask --local-scheduler
I've got the following output :
DEBUG: Checking if AggTask(param=42) is complete
INFO: Scheduled AggTask(param=42) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 15677] Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) running AggTask(param=42)
INFO: [pid 15677] Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) done AggTask(param=42)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) was stopped. Shutting down Keep-Alive thread
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib64/python2.6/threading.py", line 484, in run
File "/usr/lib64/python2.6/multiprocessing/queues.py", line 233, in _feed
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
The result is that the "AggTask" file is generated.
It is definitely not what I'm expected. I though that if the row_inpute_file_0, row_inpute_file_1, row_inpute_file_2, row_inpute_file_3 and row_inpute_file_4 were not provided. The AddTask was not executed.
Althougth, there is the Python error QueueFeederThread I do not understand.
So, If somebody can explain me what I did wrong.