How does luigi's require method work ?

1,220 views
Skip to first unread message

Jerome B

unread,
Feb 3, 2015, 5:09:17 AM2/3/15
to luigi...@googlegroups.com
Hello everybody,

I'm executing a simple example build by myself to better understand the concept of Luigi, and there're something, I do not understand. I try to simulate the check of a file presence before executing the transform and aggregation tasks. Here it is my source code :

import luigi


class CheckFileOk(luigi.Task):
   
def run(self):
       
pass

   
def output(self):
       
return luigi.LocalTarget("row_imput_file" + self.param)


class TransformTask(luigi.Task):
    param
= luigi.Parameter(default=42)

   
def require(self):
       
return [CheckFileOk(self.param)]

   
def run(self):
       
with self.output().open('w') as out_file:
            out_file
.write("transform")

   
def output(self):
       
return luigi.LocalTarget("row_imput_file_" + self.param + "_transformed")


class AggTask(luigi.Task):
    param
= luigi.Parameter(default=42)

   
def require(self):
       
return [Task1(i) for i in range(0, 5)]

   
def run(self):
       
with self.output().open('w') as out_file:
            out_file
.write("aggregate")

   
def output(self):
       
return luigi.LocalTarget("AggTask")


if __name__ == '__main__':
    luigi
.run()

I have not provided row_input_file, and when I execute my code thought the following command line :
python luigi_test.py AggTask  --local-scheduler


I've got the following output :
DEBUG: Checking if AggTask(param=42) is complete
INFO
: Scheduled AggTask(param=42) (PENDING)
INFO
: Done scheduling tasks
INFO
: Running Worker with 1 processes
DEBUG
: Asking scheduler for work...
DEBUG
: Pending tasks: 1
INFO
: [pid 15677] Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) running   AggTask(param=42)
INFO
: [pid 15677] Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) done      AggTask(param=42)
DEBUG
: 1 running tasks, waiting for next task to finish
DEBUG
: Asking scheduler for work...
INFO
: Done
INFO
: There are no more tasks to run at this time
INFO
: Worker Worker(salt=837989438, workers=1, host=quickstart.cloudera, username=cloudera, pid=15677) was stopped. Shutting down Keep-Alive thread
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
 
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
 
File "/usr/lib64/python2.6/threading.py", line 484, in run
 
File "/usr/lib64/python2.6/multiprocessing/queues.py", line 233, in _feed
<type 'exceptions.TypeError'>: 'NoneType' object is not callable



The result is that the "AggTask" file is generated.

It is definitely not what I'm expected. I though that if the row_inpute_file_0, row_inpute_file_1, row_inpute_file_2, row_inpute_file_3 and row_inpute_file_4 were not provided. The AddTask was not executed.

Althougth, there is the Python error QueueFeederThread I do not understand.

So, If somebody can explain me what I did wrong.

Ádám Divák

unread,
Feb 3, 2015, 9:18:08 AM2/3/15
to luigi...@googlegroups.com
Hi Jerome,

I'm not a Luigi expert, but as far as I can see the main problem is that the functions should be called requires instead of require (notice the ending letter S). What happens here is that Luigi treats AggTask as if it did not depend on any other tasks because of this typo, that's why you get the error. Additionally in AggTask the requirement says Task1, but I think you wanted to refer to the TransformTask instead.

Cheers,
Adam

Jerome B

unread,
Feb 3, 2015, 9:52:27 AM2/3/15
to luigi...@googlegroups.com
Hi Adam,

Thanks for your help, I was so silly to do a such error.

So here it is my new code which works very well :
import luigi


class CheckFileOk(luigi.Task):
    param
= luigi.Parameter(default=42)


   
def run(self):
       
pass

   
def output(self):

       
return luigi.LocalTarget("row_imput_file_" + str(self.param))



class TransformTask(luigi.Task):
    param
= luigi.Parameter(default=42)


   
def requires(self):

       
return [CheckFileOk(self.param)]

   
def run(self):
       
with self.output().open('w') as out_file:
            out_file
.write("transform")

   
def output(self):

       
return luigi.LocalTarget("row_imput_file_" + str(self.param) + "_transformed")


class AggTask(luigi.Task):
   
   
def requires(self):
       
return [TransformTask(i) for i in range(0, 5)]


   
def run(self):
       
with self.output().open('w') as out_file:
            out_file
.write("aggregate")

   
def output(self):
       
return luigi.LocalTarget("AggTask")


if __name__ == '__main__':
    luigi
.run()

Thanks again,

Jérôme

Alexander Krasnukhin

unread,
Feb 3, 2015, 10:08:43 AM2/3/15
to Jerome B, luigi...@googlegroups.com
You are not alone. This hit me several times as well 

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Regards,
Alexander

Ádám Divák

unread,
Feb 3, 2015, 10:14:25 AM2/3/15
to luigi...@googlegroups.com
No problem, glad to hear it works.

Adam
Reply all
Reply to author
Forward
0 new messages