class RemoveDuplicates(Task):
params=DictParameter()
def requires(self):
return ExtractTest()
def output(self):
return {'payslips_dups': LocalTarget(self.params['payslips_dups'],format=luigi.format.Nop),
'timesheets_dups': LocalTarget(self.params['timesheets_dups'],format=luigi.format.Nop),
'payslips': LocalTarget(self.params['payslips'],format=luigi.format.Nop),
'timesheets': LocalTarget(self.params['timesheets'],format=luigi.format.Nop),
}
def run(self):
meta_payslips=pd.read_csv(self.params['metaraw_payslips'])
meta_timesheets=pd.read_csv(self.params['metaraw_timesheets'])
cleand_payslips=core.cleaning(meta_payslips).duplicates()
cleand_timesheets=core.cleaning(meta_timesheets).duplicates()
with self.output()['payslips_dups'].open('wb') as ofile:
cleand_payslips[1].to_csv(ofile)
with self.output()['payslips'].open('wb') as ofile:
cleand_payslips[0].to_csv(ofile)
with self.output()['timesheets_dups'].open('wb') as ofile:
cleand_timesheets[1].to_csv(ofile)
with self.output()['timesheets'].open('wb') as ofile:
cleand_timesheets[0].to_csv(ofile)
del meta_payslips,meta_timesheets,cleand_payslips,cleand_timesheets