It appears that the self.input is set by the previous output from a requires(), but how can I interpret the loop that reads self.input? I was expecting to have an outer loop that iterates over each input that was set by each invocation of Streams(), but that does not appear to be the case.
So is run() called for each invocation of Streams() which in turn sets the input member variable to the value from the output() of Streams()?
class AggregateArtists(luigi.Task):
date_interval = luigi.DateIntervalParameter()
def output(self):
return luigi.LocalTarget("data/artist_streams_{}.tsv".format(
self.date_interval))
def requires(self):
return [Streams(date) for date in self.date_interval]
def run(self):
artist_count = defaultdict(int)
## Where does the self.input member variable get set, from each invocation of Streams()?
## If so, does this mean that the run method gets called each time through the loop in the requires()?
## If not, can you give me a quick walkthru of the flow of control for this task? Thank you.
for input in self.input():
with input.open('r') as in_file:
for line in in_file:
timestamp, artist, track = line.strip().split()
artist_count[artist] += 1
with self.output().open('w') as out_file:
for artist, count in artist_count.iteritems():
out_file.write('{}\t{}\n'.format(artist, count))
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
I think I should add a third purpose: to let complete() know what needs to exist for the task to be complete. Steven Willis <swi...@compete.com> wrote: