Feedback wanted: Proposed solution for loosely coupled tasks

854 views
Skip to first unread message

Samuel Lampa

unread,
Aug 5, 2014, 1:13:02 PM8/5/14
to luigi...@googlegroups.com
Hi all!

If somebody has the patience to read my latest blog post, we are there outlining some solutions we just figured out, to some longstanding problems we've been facing with luigi, on which we would highly appreciate feedback!


The problems we think we have found a solution to are:

- The problem that luigi defines dependencies between tasks rather than targets (while we need the latter)
- The problem that task dependencies are typically stored "hard-wired" inside luigi tasks
  (there are ways around this such as sub-classing, but that has remaining problems)
- The problem that (when using subclassing), a parameter introduced somewhere in a workflow, needs to "pollute" all downstream tasks with a duplicate of that parameter, in order to be able to send the parameter all the way from the last task (the one that is executed) up until the one that uses it, with the result that tasks are not independent from their workflows.

Additionally we realized a nice way to encapsulate whole sub-workflows inside luigi tasks, which is neat both for managing complex workflows, and should also be very useful for writing system test suites and the like.

The main thing I'm not totally happy about, is the syntax we use for declaring an upstream target to use (the nested dict structure, seen in the code example in the bottom). Suggestions for nicer ways to get this done is highly appreciated!

Let me know if something needs clarification, and I can fix in the post, and/or explain more here.

Best Regards
// Samuel

Erik Bernhardsson

unread,
Aug 5, 2014, 3:09:44 PM8/5/14
to Samuel Lampa, luigi...@googlegroups.com
Replied in the other thread too. I might be misunderstanding what you are trying to do, so it would be great to look at some code!

Some comments below:

On Tue, Aug 5, 2014 at 1:13 PM, Samuel Lampa <samuel...@gmail.com> wrote:
Hi all!

If somebody has the patience to read my latest blog post, we are there outlining some solutions we just figured out, to some longstanding problems we've been facing with luigi, on which we would highly appreciate feedback!


The problems we think we have found a solution to are:

- The problem that luigi defines dependencies between tasks rather than targets (while we need the latter)

I'm not sure what's the difference. I think a Target should be uniquely defined by a Task and vice versa. There shouldn't be two Tasks outputting to the same Target, and any Target should have only one Task creating it.
 
- The problem that task dependencies are typically stored "hard-wired" inside luigi tasks 
  (there are ways around this such as sub-classing, but that has remaining problems)

How do you mean hard-wired?
 
- The problem that (when using subclassing), a parameter introduced somewhere in a workflow, needs to "pollute" all downstream tasks with a duplicate of that parameter, in order to be able to send the parameter all the way from the last task (the one that is executed) up until the one that uses it, with the result that tasks are not independent from their workflows.

Can you elaborate what you mean with "pollute"? Can you set a default value to that parameter to keep the previous behavior for existing downstream tasks?

I see the parameters as function arguments. Say you have a function that calculates the logarithm of a number, log(x). Now, let's say you add an argument to specify the base of the logarithm. Then you would probably to maintain backwards compatibility set the default value to e: def log(x, base=math.e)

If downstream dependencies depend on the logarithm then they are dependent on the definition of the logarithm, meaning that if you change what log does, then all downstream dependencies have to change. That's the point of a dependency graph, so I'm not sure what you mean by saying "tasks are not independent from their workflows" – isn't that exactly what you want?

I think I might have to look at some code example to understand what you mean.
 

Additionally we realized a nice way to encapsulate whole sub-workflows inside luigi tasks, which is neat both for managing complex workflows, and should also be very useful for writing system test suites and the like.

The main thing I'm not totally happy about, is the syntax we use for declaring an upstream target to use (the nested dict structure, seen in the code example in the bottom). Suggestions for nicer ways to get this done is highly appreciated!

Let me know if something needs clarification, and I can fix in the post, and/or explain more here.

Best Regards
// Samuel

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Erik Bernhardsson
Engineering Manager, Spotify, New York

Samuel Lampa

unread,
Aug 5, 2014, 5:48:40 PM8/5/14
to luigi...@googlegroups.com, samuel...@gmail.com
(Comments below)

On Tuesday, August 5, 2014 9:09:44 PM UTC+2, erikbern wrote:
Replied in the other thread too. I might be misunderstanding what you are trying to do, so it would be great to look at some code!

Some comments below:

On Tue, Aug 5, 2014 at 1:13 PM, Samuel Lampa <samuel...@gmail.com> wrote:
Hi all!

If somebody has the patience to read my latest blog post, we are there outlining some solutions we just figured out, to some longstanding problems we've been facing with luigi, on which we would highly appreciate feedback!


The problems we think we have found a solution to are:

- The problem that luigi defines dependencies between tasks rather than targets (while we need the latter)

I'm not sure what's the difference. I think a Target should be uniquely defined by a Task and vice versa. There shouldn't be two Tasks outputting to the same Target, and any Target should have only one Task creating it.

True, not two tasks outputting the same, but we regularly have commands which require multiple inputs and/or generates multiple outputs, typically some external command that we run through the HPC system. 

To give a specific example, we have a task for sampling from a dataset into a "test" and a "train" partition (thus multiple output targets), where then both of these outputs take different routes downstream in the workflow (they undergo conversion to sparse dataset, and then the train dataset is used for training, and the test dataset is used for evaluation, etc. etc.).

An example for multiple input targets is our assess component, which requires both the model file generated, and the test dataset to use for the evaluation.
 
 
- The problem that task dependencies are typically stored "hard-wired" inside luigi tasks 
  (there are ways around this such as sub-classing, but that has remaining problems)

How do you mean hard-wired?

I mean that they are part of a task class' definition. Sure, you can override the requires() function in a subclass, as per the recommendations we got earlier. That goes a long way to solve the problem, but it still has the other problems with parameter duplication etc.

- The problem that (when using subclassing), a parameter introduced somewhere in a workflow, needs to "pollute" all downstream tasks with a duplicate of that parameter, in order to be able to send the parameter all the way from the last task (the one that is executed) up until the one that uses it, with the result that tasks are not independent from their workflows.

Can you elaborate what you mean with "pollute"? Can you set a default value to that parameter to keep the previous behavior for existing downstream tasks?

What I mean is that a parameter that occurs somewhere in the dependency graph, needs to exist in all downstream tasks as well, although those parameters might be totally irrelevant to those tasks.
 
I see the parameters as function arguments. Say you have a function that calculates the logarithm of a number, log(x). Now, let's say you add an argument to specify the base of the logarithm. Then you would probably to maintain backwards compatibility set the default value to e: def log(x, base=math.e)

If downstream dependencies depend on the logarithm then they are dependent on the definition of the logarithm, meaning that if you change what log does, then all downstream dependencies have to change. That's the point of a dependency graph, so I'm not sure what you mean by saying "tasks are not independent from their workflows" – isn't that exactly what you want?

I think I might have to look at some code example to understand what you mean.

I totally understand that luigi tries to implement the functional call/return semantics, where the function caller is supposed to provide the outer-most function with all information needed to calculate its value "from scratch". 

I also see that this is a very good fit for e.g. mathematics and other use cases where one typically has one input and one output for each function, and the whole idea is to build up a kind of "semantic" of what different values mean, and can calculate their values out of their mere definitions.

The only problem is that when trying to use luigi as a general-purpose workflow system, incorporating existing software, this model seems to come to its limits. 

To give a concrete example on how we need to sometimes totally swap the parts that produce a certain dataset / target, let's take this one:

We start our work with a number of datasets in a data format called "smiles" (basically a string representation of chemical molecules, one row per molecule). We have some 1800 different such smiles datasets, containing hundreds of lines / molecules each, and each dataset having a special meaning (such as that all molecules in that dataset binds to a certain protein in the body). This data is originally extracted from an SQL database, using a 1 A4 long SQL query. We thus have a luigi task for running this SQL query.

Sometimes though, we want to run a totally different smiles dataset, that we already have extracted in some other way, and just run it through the workflow. 

Then, clearly it doesn't make sense to use the SQL extractor task, but instead we might just use a task that subclasses luigi.ExternalTarget, and read in the data from the file we already have.

Now we might think that why not just add a switch in the "extractor task", to either read directly from a file, or from the SQL query.

But then the next day, we might want to construct a completely new way to extract smiles datasets, which don't use SQL databases at all, but instead takes them from a mongo DB, for example. 

We naturally want to store this new extractor in new task (because it has a different purpose, might have different parameters etc). But, now the problem comes, that we can not very easily switch between these now three different ways of getting smiles datasets, because our workflow is already littered with duplicated parameters specific to the SQL extractor code.

Sure, we can just go ahead and add more parameters to all tasks that might at sometime be downstream of the new MongoDB extractor task, but that takes time, makes the code less understandable, and makes it harder to reason about what parameters really belong to what tasks.

Thus for us, the functional call/return paradigm just doesn't seem like the right fit. And I think this is why there are also a number of different paradigms, such as data flow, which takes a different approach on these things.

That is, computations can (in data flow) have multiple inputs, outputs, and have one of an array of different execution modes (push, pull, execute on all inputs available, only one input available, etc etc). Matt Carcki's contrast of the pull and push execution modes in data flow systems at 2:50 in this video highlights some of these differences:
https://www.youtube.com/watch?v=iFlT93wakVo#t=170
(... where the pull-model in fact is quite similar to the functional call/return semantics)

So, for us, in fact, the data flow paradigm seems like a better fit. There, you can just feed a "black box" process with its required input (a smiles dataset) - regardless of how you have generated it - and the whole workflow will just chew on and process the data and return the result.

So, in summary, one could question whether we are sane trying to rework luigi towards a more data flow inspired system, instead of just starting form a data flow inspired system from the start,

I have in fact been thinking hard about that already, but the reason we still like to stick with luigi for the time being is firstly of course that we had already invested quite some time in it, but also that I/we REALLY do like a lot of its features too:

- The central scheduler
- The automatically generated command line interface
- The web UI
- The logger
- And not the least the light-weight nature of it that let us remake it to become a slight bit more like a data flow system.

Those are awesome features, and when we saw that we could find a way to get the parts of data flow that we needed, we felt that it'd most probably be worth it to do these few workarounds in order to continue using luigi instead of spending time searching, evaluating and learning some data flow inspired workflow engine out there, which we don't even know if it has all the other features we need.

Hope this clarifies our ambitions and where we're coming from a little! 

So, at last, thanks for a very powerful system that has helped us tons, and even allowed us to stretch its dependency management a bit towards another paradigm! :)

Cheers
// Samuel

Erik Bernhardsson

unread,
Aug 15, 2014, 1:16:10 PM8/15/14
to Samuel Lampa, luigi...@googlegroups.com
Thanks for the long explanation, and sorry for the delay.

You actually convinced me that it's a good use case. I haven't thought much about the use case you describe – maybe because we haven't encountered it ourselves.

It seems like the issue is whenever you have general purpose workflows  as opposed to general purpose tasks. Eg. you might have a SQLImporter Task that you can plug in everywhere, and that's fine, but say you want to plug in a XML2TSV -> SQLImporter into multiple places of your workflow. Or even more complicated.

My intuitive solution would be to generalize the input of those workflow, and make them accept tasks as parameters, propagate them up the chain, and return then as requirements. For instance

class XML2TSV(luigi.Task):
   dependency = luigi.Parameter()
   def requires(self): return self.dependency
   def run(self): ...

class SQLImporter(luigi.Task):
   dependency = luigi.Parameter()
   def requires(self): return XML2TSV(self.dependency)
   def run(self): ...

That way, you can use it in multiple places of your workflow.

But let me think about it more


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Samuel Lampa

unread,
Aug 18, 2014, 6:50:30 AM8/18/14
to luigi...@googlegroups.com, samuel...@gmail.com
Hi, and thanks for taking the time to think and answer about this!

On Friday, August 15, 2014 7:16:10 PM UTC+2, erikbern wrote:
Thanks for the long explanation, and sorry for the delay.

You actually convinced me that it's a good use case. I haven't thought much about the use case you describe – maybe because we haven't encountered it ourselves.

It seems like the issue is whenever you have general purpose workflows  as opposed to general purpose tasks. Eg. you might have a SQLImporter Task that you can plug in everywhere, and that's fine, but say you want to plug in a XML2TSV -> SQLImporter into multiple places of your workflow. Or even more complicated.


Yeah, but AFAIS, the problem arises already when you want to plug in that SQLImporter everywhere. Indeed, you can solve it somehow with the solution proposed below (although it creates other problems, more about that later), with a "dependency" parameter. I just think it arises already when you want one pluggable component.


My intuitive solution would be to generalize the input of those workflow, and make them accept tasks as parameters, propagate them up the chain, and return then as requirements. For instance

class XML2TSV(luigi.Task):
   dependency = luigi.Parameter()
   def requires(self): return self.dependency
   def run(self): ...

class SQLImporter(luigi.Task):
   dependency = luigi.Parameter()
   def requires(self): return XML2TSV(self.dependency)
   def run(self): ...

That way, you can use it in multiple places of your workflow.


Yes, and this is along the lines what we tried before as well. It goes some way in solving this, and might work well enough for simpler workflows.

The problem is that if this combination is supposed to be used in the middle of dependency chain that is ten tasks long, then all the five tasks between these tasks and the last task (which is the one that will be called when executing the workflow) need to *also* have the same dependency parameter, so that it can be passed on all the way from the calling task to where it is used.

Thus, in a way, the location of these two tasks become somewhat "fixed" in that place of the workflows, because, if you want to move it into another workflow that doesn't have that dependency parameter in all its tasks, you have to add that dependency parameter in all tasks between where these tasks are used, and the last task, before you can add them.

And what we are trying to achieve, I guess is kind of the "extreme end" where every single task can be reconnected to any other tasks (provided it can handle the in/output formats of those other tasks, but that is up to the workflow designer to keep track of). Thus, we needed a solution that doesn't require any changes to any other tasks (such as adding/removing dependency parameters) when we re-connect the tasks.

And the only solution we have seen so far, is the one proposed in the linked blog post [1], where we instantiate all tasks from "outside" (meaning "not inside any of the tasks in the workflow"), and then send the respective dependencies as parameters into the respective dependent tasks.

Anyways, I think you are on the way to start to see our problem! :)


Best

Erik Bernhardsson

unread,
Aug 18, 2014, 3:25:01 PM8/18/14
to Samuel Lampa, luigi...@googlegroups.com
I see. How do you deal with the output of the intermediate modular tasks though? There must be something that identifies what they depend on in the output path right?


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Placentra

unread,
Aug 18, 2014, 5:48:19 PM8/18/14
to Erik Bernhardsson, Samuel Lampa, luigi...@googlegroups.com
Hi Samuel,

I am in favor of the strategy you mentioned where one could put "switching" logic in the extractor task. If the SQL/MongoDB/other data extraction parameters are not relevant to the functional dependencies of the tasks they're polluting at the bottom of the chain anyway, is it possible in your case to move that information into config files instead of using parameters? I think it's a simpler solution, and we've solved the problem that way in the past. We then introduced a "middleman" task (a luigi.WrapperTask) which represents the output data that's required, as opposed to the action of getting that data, which is similar to the flow-based idea. The "switching" logic happens in the middleman task, and, in your case, it can return either MyFlatFileTask() or MyMongoDBTask() in its requires().

As for the case of having too many parameters that do fit logically down the chain because they are relevant to the functional dependencies (i.e. they are important for determining uniqueness), we found another solution we like which I plan on posting in the other thread (https://groups.google.com/d/msg/luigi-user/7zIS_qseNOM/TWcYtRe5SVoJ).

Best,
Mike

Mike Placentra | RUN www.runads.com

Martin Czygan

unread,
Aug 18, 2014, 6:39:51 PM8/18/14
to luigi...@googlegroups.com
I think for a small number of cases, the "switching logic" middleman
task can work. We have a few of them, and they are actually quite
nice, since they abstract away a lot of details, which the consumer of
the middleman task does not need to care about (the consumer might set
some parameter, but that's it).

On another note. I remember I looked at a framework some time ago,
that seemed to be more loosely coupled[1]. However, in one sentence,
the framework worked like this: you write a task (in Java), that has
*typed* inputs and outputs and then you define some script that uses
these tasks - like a puzzle, you can only combine tasks, that are
*compatible* (e.g. a task that expects XML is only pluggable into a
task that outputs XML, etc.). This custom scripting language then
borrows syntax from bash, namely the pipe, to combine these smaller
tasks into workflows; here's an example:
https://github.com/culturegraph/metafacture-core/wiki/Flux-user-guide#writing-flux-files):

file|
open-file|
as-lines|
....
write("stdout");

I think the idea and the implementation of the framework is nice - and
probably along the lines of what you mean with loosely coupled task.
My guess is that luigi (explicit dependency graph) and frameworks like
the above (typed io/pluggable pieces) are a bit orthogonal in their
approach to workflow implementation.

Finally, I guess ... I'll think a bit more about it, too ...

----

[1] .. but probably unusable for your requirements, Samuel, since its
written for a niche domain.

Samuel Lampa

unread,
Aug 19, 2014, 2:49:47 AM8/19/14
to luigi...@googlegroups.com, samuel...@gmail.com
On Monday, August 18, 2014 9:25:01 PM UTC+2, erikbern wrote:
I see. How do you deal with the output of the intermediate modular tasks though? There must be something that identifies what they depend on in the output path right?


That's a very relevant question I think.

We've been very happy though with a very simple approach where each task always just appends a custom extension to the outputs of any upstream tasks / chain of tasks.

Thus, they all implement their output() function along these lines [1]:

class SomeTask(luigi.Task):
    ...
    def output(self):
        return luigi.LocalTarget( self.input().path + ".some_new_extension" )

Or, in fact, we typically try to also add the value of task parameters that distinguish different outputs from each other as well, so a more complete example would be:

class SomeTask(luigi.Task):
    ...
    def output(self):
        return luigi.LocalTarget( self.input().path + ".some_new_extension_someval" +  str(self.some_parameter) ) 

This is combined with a first task of the luigi.ExternalTarget class, that implements a unique file name based on some of the parameters used in the workflow (such as "dataset_name" or "protein_name", or whatever makes that specific dataset unique).

Thus, the filenames of files produced after a long chain of tasks will reflect the tasks that produced it, and might look something like:

dataset_123.filtered_somevalue_max_50.testdataset_samplingmethod_random.sparse_dataset

a.s.o. a.s.o.

This has some implications of course, such that if we re-wire tasks anywhere in the workflow, then all the downstream tasks (that depend on the task that was added/removed) need to be re-run, (technically since they will be looking for a slightly different file name pattern in the targets they depend on)

This might look like a problem at first, but it turns out to be exactly what we want anyway, since re-wiring a workflow anywhere in the chain, will mean that all downstream results will be different. So all in all, we're very happy with that pattern.

BR
// Samuel


[1] Although, to be more specific, in our solution that allows multiple inputs and outputs, we call the special "get_input()" method, to retrieve incoming targets:

class SomeTask(luigi.Task):
    ...
    def output(self):
        return luigi.LocalTarget( self.get_input("some_upstream_target").path + ".some_new_extension" )
 
... but that's a little aside the point here I guess)

Samuel Lampa

unread,
Aug 19, 2014, 4:07:18 AM8/19/14
to luigi...@googlegroups.com
On Tuesday, August 19, 2014 12:39:51 AM UTC+2, Martin Czygan wrote:
I think for a small number of cases, the "switching logic" middleman
task can work. We have a few of them, and they are actually quite
nice, since they abstract away a lot of details, which the consumer of
the middleman task does not need to care about (the consumer might set
some parameter, but that's it).

I tend to agree here. This approach is good for some use cases, although apparently not for all.

The problem is that AFAIS, in this case you have to somehow specify all the possible variations of upstream tasks here, although what we want and envision, is that we should easily be able to stitch in any task with the correct input/input, at any time, without modification of any of the tasks, except the "workflow container tasks" that we create to contain the whole workflow definitions, as exemplified in my blog post [1].

 
On another note. I remember I looked at a framework some time ago,
that seemed to be more loosely coupled[1]. However, in one sentence,
the framework worked like this: you write a task (in Java), that has
*typed* inputs and outputs and then you define some script that uses
these tasks - like a puzzle, you can only combine tasks, that are
*compatible* (e.g. a task that expects XML is only pluggable into a
task that outputs XML, etc.). This custom scripting language then
borrows syntax from bash, namely the pipe, to combine these smaller
tasks into workflows; here's an example:
https://github.com/culturegraph/metafacture-core/wiki/Flux-user-guide#writing-flux-files):
 

    file|
    open-file|
    as-lines|
    ....
    write("stdout");

I think the idea and the implementation of the framework is nice - and
probably along the lines of what you mean with loosely coupled task.
My guess is that luigi (explicit dependency graph) and frameworks like
the above (typed io/pluggable pieces) are a bit orthogonal in their
approach to workflow implementation.


I think you are right about that, and I think it is really interesting to study the differences between these approaches, as they also seem to parallel two very prominents approaches in programming: the functional call/return semantics, and the data flow / flow based programming one.

I find it interesting that you mention about the typed inputs and outputs. That reminds me again about what I think makes the flow based programming (FBP) approach differ from the typical functional call/return one:

Rather than defining relations between functions directly (by the means of function calls from inside other functions), which strikes me as  quite similar to how luigi works, the FBP approach makes the connections and ports (inputs / outputs) of functions first class objects, so that  the wiring of those connections into the ports of the functions always happen outside of the function "boxes" (or process, in FBP lingo).

As far as I can see, this makes the "functions" in FBP more loosely coupled, since you never make a direct function call to another "function box" / FBP process, inside an FBP process. Instead the only thing the FBP process knows about is the input and output ports. So, it receives from the input ports, processes the data, and return the results on the output ports. That's all the FBP process is involved in. No connectivity business at all.

Instead all the wiring happens outside, in a declarative fashion, and can in fact potentially be re-wired at any time, even at run time.

In fact, this has a lot of benefits that one might not see at first, such that it is very easy to temporarily plug in special purpose processes such as logging and monitoring ones, that will just forward any input on its input ports to its output ports, but at the same time log or monitor some information about the data (the so called "information packets, again in FBP lingo) that pass it.

After thinking about it for a while, IMO, you start to see that there are quite dramatic new possibilities opening up with this approach. This is part of what made us try to see if we could get luigi to operate a bit more in this fashion. 

If this is the right thing to do might be debatable, but at least the solution implemented (and mentioned in the blog post) has already helped us tremendously, lessening the complexity of our workflows by magnitudes.

Still, I'm not arguing that the FBP approach is always better than the "functional call/return" approach. I'm sure it is very attractive for a lot of use cases, especially where you don't do a lot of re-wiring of the dependency graph. The FBP approach just seems to fit our use cases better.

Cheers
// Samuel

 

Erik Bernhardsson

unread,
Aug 19, 2014, 1:57:08 PM8/19/14
to Samuel Lampa, luigi...@googlegroups.com
Random thoughts:

On the function/call semantics vs flow-based: it does make sense to think about flow in some cases. I think however one of the key things about the function-based approach is that you never have to care about what the underlying function depends on – the function is the entire interface. If it wasn't for that, Luigi Tasks would just implement a 'run' method, and you would have to stitch them together yourself, building the DAG and thinking about the execution order.

I think what you want to have here is to say: A depends on "something", X, that I will define later, B depends on A, C depends on B, etc. So C's dependencies aren't really fully specified until later. And more importantly, B doesn't have to care about X.

There's some hacky ways to do this in Luigi, like creating classes dynamically, but none is good.

One crazy thing would be to allow partially specified dependencies – B could say that it depends on A, but not specify X, and just relay it. C would then specify what X is. I haven't really though through the implications of this, but it seems a little abstract.

Some of this would also be easier to do on an object level, not a class level. If you could manipulate task classes like objects, it would be much easier to build up things dynamically. I think this is theoretically appealing, but probably would rule out a lot of the magic stuff we do to tie parameters, so I think it would introduce a lot more boiler plate.





--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages