Airflow operators (similar to Luigi Tasks) have a handy syntax for declaring dependencies:
op1 >> op2
op1.set_downstream(op2)
op2 << op1
op2.set_upstream(op1)
I tried to add something similar to a BaseTask in luigi, where the bit shift operator would be used to generate the requires method:
class BaseTask(luigi.Task):
@property
def __lshift__(self, other):
self = requires(task_to_require=other)(task_that_requires=self)
return self
But when I try to load the module I get a type error:
TypeError: unsupported operand type(s) for <<: 'Register' and 'Register'
Where 'Register' is the metaclass for Luigi Tasks. Does anyone know why I can't add the bitshift operator to a BaseTask like this? Thanks!
# 'the airflow equivalent'
# TaskA >> TaskB
# TaskA.set_downstream(TaskB)
requires(task_to_require=TaskA)(task_that_requires=TaskB)
# 'the airflow equivalent'
# TaskA << TaskB
# TaskA.set_upstream(TaskB)
requires(task_to_require=TaskB)(task_that_requires=TaskA)
My current understanding:
The first code snippet above fails because we're working at the class level and not the object level? So when I try to do the bitshift operator in the module where the classes are declared it looks at the 'Register' metaclass for the method and fails. The best workaround I can think of is this...
class W(object):
def __init__(self, task):
self.task = task
def __lshift__(self, other):
return requires(task_to_require=other)(task_that_requires=self.task)
def __rshift__(self, other):
return requires(task_to_require=self.task)(task_that_requires=other)
W(TaskA) >> W(TaskB)
Is there a cleaner/sneaker way to do this? Thanks!
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.