How is self.input set when requires() iterates over a dependent task?

1,717 views
Skip to first unread message

timsm...@hotmail.com

unread,
Feb 12, 2014, 4:51:07 PM2/12/14
to luigi...@googlegroups.com
Sorry for the shotgun of questions, but I am am so stoked that up and running already and just want to go nuts and build a ton of work flows...

It appears that the self.input is set by the previous output from a requires(), but how can I interpret the loop that reads self.input? I was expecting to have an outer loop that iterates over each input that was set by each invocation of Streams(), but that does not appear to be the case.

So is run() called for each invocation of Streams() which in turn sets the input member variable to the value from the output() of Streams()?

class AggregateArtists(luigi.Task):
date_interval = luigi.DateIntervalParameter()

def output(self):
return luigi.LocalTarget("data/artist_streams_{}.tsv".format(
self.date_interval))

def requires(self):
return [Streams(date) for date in self.date_interval]

def run(self):
artist_count = defaultdict(int)

## Where does the self.input member variable get set, from each invocation of Streams()?
## If so, does this mean that the run method gets called each time through the loop in the requires()?
## If not, can you give me a quick walkthru of the flow of control for this task? Thank you.

for input in self.input():
with input.open('r') as in_file:
for line in in_file:
timestamp, artist, track = line.strip().split()
artist_count[artist] += 1

with self.output().open('w') as out_file:
for artist, count in artist_count.iteritems():
out_file.write('{}\t{}\n'.format(artist, count))

Steven Willis

unread,
Feb 12, 2014, 5:24:23 PM2/12/14
to luigi...@googlegroups.com
They way I look at it, the output() method has two purposes:

1) It tells the task where to write its own output (duh), so you can just call self.output() in the run() method and write to it.
2) It tells dependent Tasks were to read their input from, so they can just call self.input() in their run() method and read from it.

The input() method returns the output Targets from all the Tasks this task depends on. You have to be a little careful with this though, if your requires() returns a single Task, and that task's output() method returns a single Target, then you'll get a single Target from input(). If your requires() returns a list of Tasks (each of which return a single Target from output()), then you'll get a list of Targets that you'll need to iterate over. This is the case with AggregateArtists. This kind of setup works well when each of your requirements is the same type of Task and they all return the same type of Target. If your task has different types of dependencies that return different types of data, you may want to return a dict from your requires(). Like maybe:

class PredictNextBigThingTask(luigi.Task):
  date = luigi.DateParameter()

  def requires(self):
    return {'artists': ArtistsTask(self.date),
            'charts': ChartTask(self.date),
            'arbitron': ArbitronRatingsTask(self.date)}

  def run(self):
    foo = process_artists(self.input()['artists'].open('r'))
    bar = process_charts(self.input()['charts'].open('r'))
    baz = process_arbitron(self.input()['arbitron'].open('r'))
    superstar = cool_algorithm(foo, bar, baz)
    self.output.open('r').write(superstar)



--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Erik Bernhardsson

unread,
Feb 13, 2014, 6:09:50 AM2/13/14
to Steven Willis, luigi...@googlegroups.com
Yeah – input() just calls requires() and transforms all Task classes into Task classes, so nothing magical.

run() is called for each Streams instantiation, but not within the same scope. The scheduler traverses the dependency graph, then runs everything in some order
--
Erik Bernhardsson
Engineering Manager, Spotify, New York

Steven Willis

unread,
Feb 13, 2014, 9:21:00 AM2/13/14
to luigi...@googlegroups.com
I think I should add a third purpose: to let complete() know what needs to exist for the task to be complete.

Steven Willis <swi...@compete.com> wrote:

Reply all
Reply to author
Forward
0 new messages