Dealing with multiple inputs and outputs

6,161 views
Skip to first unread message

Samuel Lampa

unread,
Jun 18, 2014, 3:24:40 AM6/18/14
to luigi...@googlegroups.com
Hi,

I was wondering about how you guys are dealing with multiple inputs and outputs from tasks - that is: Tasks that depend on more than one upstream task (multiple inputs) and/or depends on more than one output (multiple outputs) from an upstream task?

We have this situation all the time when using luigi to wrap command line apps into workflows.

The way we have been doing is by simply returning python dicts from the requires() and output() functions in tasks, so that the multiple inputs/outputs get named, something like this sketchy example (Pay special attention to the eoutput function of Task A1 and A2, and the requires(), output() and run() functions of TaskB):

class TaskA1(luigi.Task):
def requires(self):
....
def output(self):
return { 'output1' : luigi.LocalTarget( self.input().path + ".output1" ),
'output2' : luigi.LocalTarget( self.input().path + ".output2" ) }
def run(self):
....
 
class TaskA2(luigi.Task):
def requires(self):
....
def output(self):
return { 'output1' : luigi.LocalTarget( self.input().path + ".output1" ),
'output2' : luigi.LocalTarget( self.input().path + ".output2" ) }
def run(self):
....
 
class TaskB(luigi.Task):
def requires(self):
return { 'input1' : TaskA1(),
'input2' : TaskA2() }
 
def output(self):
return { 'output1' : self.input()['input1']['output1'].path + ".output1",
'output2' : self.input()['input1']['output1'].path + ".output2" }
 
def run(self):
with self.input()['input11']['output1'].open("r") as infile1,  
self.input()['input11']['output2'].open("r") as infile2, 
self.input()['input12']['output1'].open("r") as infile3, 
self.input()['input12']['output2'].open("r") as infile4, 
self.output['output1'].open("w") as outfile:
self.output['output2'].open("w") as outfile2:
for line in infile:
line = exec_command("some-command -reffile=" + infile1 + " -dataset=" + infile2 " <.....> "-outfile=" + outfile1 + " -newreffile=" + outfile2...)
outfile.write(line)



The above works, obviously, but it starts to feel rather clunky and a bit error prone - especially when subclassing tasks in a workflow file with the aim to override the requires function to create custom workflows. Then one have to remember exact what the dict keys were named, both for all the inputs and outputs of each task, something one typically does not.

Also, the error messages are not the most sensible when trying to access a dict key that does not exist etc.

... so, I was wondering if anybody has found a better way to deal with this situation?

Best Regards
/// Samuel

Martin Czygan

unread,
Jun 18, 2014, 5:18:37 AM6/18/14
to Samuel Lampa, luigi...@googlegroups.com
Hi,

We also use dictionaries for multiple inputs. For multiple outputs, we
mostly use another level of indirection. For example we have a "split"
task, that splits a file into parts. The result of that task is a
single file that contains the paths to the parts, one per line. It has
worked so far for us.


Cheers,
Martin

----

P.S.

As for snippets like

def output(self):
return { 'output1' : luigi.LocalTarget( self.input().path +
".output1" ),
'output2' : luigi.LocalTarget( self.input().path +
".output2" ) }

Dealing with filenames has been a little pain point early on. For
that, we wrote a superclass, so we do not need to think about
filenames again:
https://github.com/miku/gluish/blob/master/gluish/task.py#L30
(although on pypi, the package "gluish" is still kind of
experimental).

It determines the path for the output based on the significant
parameters plus some prefix (BASE and TAG, these are idiosyncratic to
our project, defaults to /tmp and 'default').


luigi.Task (vanilla task)
BaseTask (adds a `self.path` method and expects BASE and TAG to be set)
--------
GenericTask (BASE e.g. /tmp)
WikipediaTask (TAG e.g. 'wikipedia')
WikipediaDump
""" This is the real task, and it will have an output like
/tmp/wikipedia/WikipediaDump/date-2014-01-01.xml
"""
date =
luigi.DateParameter(default=datetime.date(2014, 1, 1))
...
FreebaseTask (TAG e.g. 'freebase')
FreebaseDump (output goes to /tmp/freebase/FreebaseDump/...)
...
...


Here's another example: https://gist.github.com/miku/e72628ee54fce9f06a34
> --
> You received this message because you are subscribed to the Google Groups
> "Luigi" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to luigi-user+...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Samuel Lampa

unread,
Jun 18, 2014, 5:35:00 AM6/18/14
to luigi...@googlegroups.com, samuel...@gmail.com
Hi Martin and thanks for the feedback!

Interesting with the filenames-in-a-file approach. WIll think about that for our use case. Also I really like the approach taken with automatic file name creation. Basically it does what we are already doing anyway, although manually (using parameters to name the outputs). Doing that in an automated way sounds like a really good idea that will probably simplify things a lot for us as well, so, again, thanks a lot for sharing your experiences!

Cheers
// Samuel  

Samuel Lampa

unread,
Aug 5, 2014, 1:02:49 PM8/5/14
to luigi...@googlegroups.com, samuel...@gmail.com
Hi!

We figured out a solution to this problem (as well as some other problems). See:

// Samuel

til...@gmail.com

unread,
Dec 11, 2016, 11:13:31 AM12/11/16
to Luigi
Hey guys,

I'm also facing a similar issue and would like to share how I've been dealing with multiple output targets.

I have an example of a Task that generates multiple files as outputs, all of them known only at `run()` time. Because of that I can't define what the Targets are in the `output()` method.

In order to solve this, I've followed an approach similar to Martin's:

1. Define a plain text file as this Task's output Target.
2. While inside the `run()` method: append the path to the *real* output targets in this file as they're discovered.
3. Override the `complete()` method so that it reads the `output()` file and asserts that every path listed there exists.


I suppose that downstream tasks could also read this file and dinamically generate their requirements.

This might seem a bit hacky, but it kind of solved my problem. :)

Cheers,
Tiago.

Dave Buchfuhrer

unread,
Dec 11, 2016, 11:45:24 AM12/11/16
to til...@gmail.com, Luigi
Luigi is designed to work best when each job has one output, so anything you do that requires multiple outputs per job will feel a bit hacky. You can make your job a bit less hacky if all of the outputs go into the same directory. Then your output can be that directory. You don't need to create a file to list the directory contents or override complete in this case.

Downstream jobs still need to figure out which part of the output is their input, but you actually have a bigger problem there. If you don't know what the output will be, how do you know which downstream tasks to schedule? If there are a fixed number of downstream tasks, you should probably be renaming your outputs so they're predictable and match what the downstream tasks expect. If you have a dynamic set of downstream tasks because the output is that unpredictable, you'll have to either generate dynamic dependencies to create all the right tasks or kick off more scheduling to generate the right tasks.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

Tiago Guimarães

unread,
Dec 11, 2016, 12:02:11 PM12/11/16
to Dave Buchfuhrer, Luigi
Hey Dave,

thanks for sharing!

Using a directory as a target sounds like a really clever workaround.

I'm going for it when (re)designing my tasks!



--
You received this message because you are subscribed to a topic in the Google Groups "Luigi" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/luigi-user/ybWN_lEcPrU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to luigi-user+unsubscribe@googlegroups.com.

Samuel Lampa

unread,
Dec 12, 2016, 1:18:56 AM12/12/16
to Luigi
Interesting to see this topic being brought up again, and all the creative solutions people are finding to it

Just for the record, for anyone stumbling into this thread, we've been happy with sciluigi as a solution to this, ever since we created it:
https://github.com/pharmbio/sciluigi
(Recent paper: http://dx.doi.org/10.1186/s13321-016-0179-6 (open access))

Cheers
// Samuel

the...@gmail.com

unread,
Dec 22, 2016, 8:01:47 PM12/22/16
to Luigi, til...@gmail.com
On Sunday, December 11, 2016 at 8:45:24 AM UTC-8, Dave Buchfuhrer wrote:
> You can make your job a bit less hacky if all of the outputs go into the same directory. Then your output can be that directory.

Directory targets are a really interesting idea. I didn't know Luigi supported directories as targets like that. Could you point me to an example by chance?

Arash Rouhani Kalleh

unread,
Dec 22, 2016, 9:15:45 PM12/22/16
to James Hibbard, Luigi, til...@gmail.com
Ah. Technically the difference is just doing `MyTarget('/path/to/directory/')` instead of `MyTarget('/path/to/file.txt')`. Hmm, I don't know of any example on top of my head. :)

Lante Dellarovere

unread,
Feb 1, 2023, 6:10:48 AM2/1/23
to Luigi

Hi Dave,

I was thinking at the same "directory output" solution. But I haven't figure out how to make the task atomic. In fact, if the workflow crashes before all the tasks are done (and so before all the file outputs have been saved into the same directory output), then the program, at a re-run, will consider all the tasks completed, since the directory target exists, although "incomplete". Am I missing something? Is there a proper and better way to use a directory as output target?
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages