How to concatenate results from multiple tasks

41 views
Skip to first unread message

Stephen Sun

unread,
Feb 6, 2022, 10:04:26 PM2/6/22
to Luigi
Hi guys,

I have a list of files and I want to read them all at once and then concatenate them to a large dataframe and save it as a .pkl file. 

Currently, I'm using multiprocessing.Pool(processes=(multiprocessing.cpu_count() )).map() in one single task 
but I just realize Luigi does multiprocessing through workers so my approach is not workable. 
However, if I read all files in different tasks, it's not able to pass dataframes between tasks and I can't concatenate them to a large .pkl file.

I'm wondering if there is a way to achieve my goal. Thanks

Paul Madden

unread,
Feb 7, 2022, 10:20:23 AM2/7/22
to Luigi
Would something like this help?

import luigi

class Sum(luigi.Task):
    n = luigi.parameter.IntParameter()
    def requires(self):
        return [Square(n) for n in range(self.n + 1)]
    def run(self):
        with open(self.output().path, "w") as f:
            print(sum([x.square for x in self.requires()]), file=f)
    def output(self):
        return luigi.LocalTarget("sum")

class Square(luigi.Task):
    i = luigi.parameter.IntParameter()
    ran = False
    def complete(self):
        return self.ran
    def run(self):
        self.square = self.i * self.i
        self.ran = True


In your case, an analog to the Square class could read a file and store the dataframe in an instance variable, like self.square here. Then the task that require()'d a collection of Square-analog classes could access the instance variables and combine the results.

Demo:

% PYTHONPATH=$PWD luigi --module demo Sum --n 3 --local-scheduler                                                                                                                                                          
DEBUG: Checking if Sum(n=3) is complete
DEBUG: Checking if Square(i=0) is complete
DEBUG: Checking if Square(i=1) is complete
DEBUG: Checking if Square(i=2) is complete
DEBUG: Checking if Square(i=3) is complete
INFO: Informed scheduler that task   Sum_3_ab99dd1074   has status   PENDING
INFO: Informed scheduler that task   Square_3_5d0c77e0b0   has status   PENDING
INFO: Informed scheduler that task   Square_2_a9facda7c5   has status   PENDING
INFO: Informed scheduler that task   Square_1_64a8d99e05   has status   PENDING
INFO: Informed scheduler that task   Square_0_d010c9c495   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 5
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=me, pid=6533) running   Square(i=2)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) done      Square(i=2)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Square_2_a9facda7c5   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) running   Square(i=3)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) done      Square(i=3)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Square_3_5d0c77e0b0   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) running   Square(i=0)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) done      Square(i=0)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Square_0_d010c9c495   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) running   Square(i=1)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) done      Square(i=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Square_1_64a8d99e05   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) running   Sum(n=3)
INFO: [pid 6533] Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) done      Sum(n=3)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Sum_3_ab99dd1074   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=149535173, workers=1, host=x, username=
me, pid=6533) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 4 Square(i=0...3)
    - 1 Sum(n=3)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

% cat sum
14

Stephen Sun

unread,
Feb 7, 2022, 6:40:28 PM2/7/22
to Luigi
Hi Paul,

Thanks for the help. I tried your demo but it returned Unfulfilled dependencies.

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/luigi/worker.py", line 176, in run
    raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependencies at run time: Square_0_d010c9c495, Square_1_64a8d99e05, Square_2_a9facda7c5, Square_3_5d0c77e0b0

import luigi

class Sum(luigi.Task):
n = luigi.parameter.IntParameter()
def requires(self):
return [Square(n) for n in range(self.n + 1)]
def run(self):
with open(self.output().path, "w") as f:
print(sum([x.square for x in self.requires()]), file=f)
def output(self):
return luigi.LocalTarget("sum")

class Square(luigi.Task):
i = luigi.parameter.IntParameter()
ran = False
def complete(self):
return self.ran
def run(self):
self.square = self.i * self.i
self.ran = True

luigi.build([Sum(3)],workers=3, local_scheduler=True)

I also tried it with my code (without rewriting the complete()) but it returns Traceback: Test has no attribute data

import luigi
from datetime import datetime
import pandas as pd
import os
from pathlib import Path


class Test(luigi.Task):
myfile=luigi.Parameter()
def output(self):
return luigi.LocalTarget('test/'+Path(self.myfile).stem+'.txt')
def run(self):
self.data=pd.read_csv(self.myfile)
with self.output().open("w") as f:
f.write(os.path.basename(self.myfile)+' is completed')

class RunTest(luigi.Task):

def requires(self):
return [Test(i) for i in allfile]

def output(self):
target=luigi.LocalTarget('test.pkl',format=luigi.format.Nop)
return target
def run(self):
with self.output().open("w") as f:
data=pd.concat([x.data for x in self.requires()])
data.to_pickle(f)
luigi.build([RunTest()],workers=8, local_scheduler=True)

Stephen Sun

unread,
Feb 7, 2022, 6:53:52 PM2/7/22
to Luigi
I tried to call the class and the object dict has no attribute square 

{'i': 3, 'param_kwargs': {'i': 3}, 'task_id': 'Square_3_5d0c77e0b0', '_Task__hash': -956811924875508640, 'set_tracking_url': None, 'set_status_message': None, 'set_progress_percentage': None}


class Square(luigi.Task):
i = luigi.parameter.IntParameter()
ran = False
def complete(self):
return self.ran
def run(self):
self.square = self.i * self.i
self.ran = True
a=Square(3)
a.__dict__

On Tuesday, 8 February 2022 at 02:20:23 UTC+11 Paul Madden wrote:

Paul Madden

unread,
Feb 7, 2022, 7:40:17 PM2/7/22
to Luigi
The Square object doesn't have a square attribute until you call a.run().

Stephen Sun

unread,
Feb 7, 2022, 7:51:44 PM2/7/22
to Luigi
Could you please share how you run the demo code? I start the luigi with "luigi.build([Sum(3)],workers=3, local_scheduler=True)"  and all Square() runs success but Sum() get the traceback:  'Square' object has no attribute 'square'

Paul Madden

unread,
Feb 7, 2022, 7:57:13 PM2/7/22
to Luigi
I think my idea was naive: I suspect the Sum object does not have access to the Square objects as they existed in separate processes when they executed, such that their .square attributes don't exist from the point of view of Sum. My example worked because I used only tested with a single worker, so that all objects shared the same memory space. Your use case may not be easily solved by Luigi, though I'll think about it and see if I have any other ideas. Maybe others will.

Paul Madden

unread,
Feb 7, 2022, 7:58:41 PM2/7/22
to Luigi

Stephen Sun

unread,
Feb 7, 2022, 10:40:28 PM2/7/22
to Luigi
Thanks Paul! Rewrite __init__ fails (can't pass attributes value from run(). it will execute ) too. I'll post here if I found any good solution.

Stephen Sun

unread,
Feb 8, 2022, 7:26:09 PM2/8/22
to Luigi
Finally solved the issue by using MockTarget. Just generate whatever file format you want and it returns a mock file. So it could be read from memory. 

class Test(luigi.Task):
myfile=luigi.Parameter()

def output(self):
return MockTarget('x')

def run(self):
data=pd.read_excel(self.myfile)
with self.output().open('w') as f:
data.to_csv(f)

class RunTest(luigi.Task):

def requires(self):
return [Extract(i) for i in file_list['timesheets']]

def output(self):
return luigi.LocalTarget('test.pkl',format=luigi.format.Nop)
def run(self):
with i.open('r') as f:
data=pd.read_csv(f)
with self.output().open('w') as f:
do whatever. you want...

Reply all
Reply to author
Forward
0 new messages