Running luigi workflows depending on same task in different python subprocesses

428 views
Skip to first unread message

apar...@gmail.com

unread,
Apr 27, 2015, 4:33:34 PM4/27/15
to luigi...@googlegroups.com
Hi,
I am like 3 weeks old using luigi.
In one of our projects, we are trying to leverage luigi to implement our process workflow.

In my workflow, there are n tasks depending on 1 common task.
These n tasks run in python subprocesses. Only One of subprocesses is picking up to execute 1 common task (which is good). Unfortunately other subprocess which depend on this task , detect this task is scheduled on different worker and get stopped, without waiting for its completion.

I am attaching the simulated code snippets here. Please do let me know, if this is bug or expected behavior.

This is file - call_feed_master.py
~

import test_feed_master
import multiprocessing as mp
import sys
import os
import logging

def run_cmd(cmd, success_msg, error_msg):
print 'Command to run: ', cmd
try:
status = os.system(cmd)

if status == 0:
print(success_msg)
return 0
else:
print(error_msg)
return 2000
except Exception:
print(error_msg)
return 2000

def worker(feed_name, start_date, end_date, output_queue):

run_feed_master_dur_cmd = "python test_feed_master.py FeedMasterDurTask --feed-name %(feed_name)s --start-date %(start_date)s --end-date %(end_date)s " %locals()
status = run_cmd(run_feed_master_dur_cmd, "Ran feed master dur task", "Unable to run feed master dur task")
output_queue.put(status)

if __name__ == '__main__':
start_date = sys.argv[1]
end_date = sys.argv[2]
output_queue = mp.Queue()
feeds_list = ['ab', 'cd', 'ef' , 'gh']
processes = [mp.Process(target= worker, args=(feed_name, start_date, end_date, output_queue)) for feed_name in feeds_list]

for p in processes:
p.start()

for p in processes:
p.join()

sys.exit(0)
~


test_feed_master.py

import luigi
from datetime import datetime, timedelta
import os
import time

class ATask(luigi.Task):
processing_date = luigi.Parameter()

def run(self):
for i in range(120):
time.sleep(1)
print("Executing ATask %s for %s" %(str(os.getpid()), self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)

def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/A_TABLES/Generate' %self.processing_date)

class RecordAStatusTask(luigi.Task):
processing_date = luigi.Parameter()

def requires(self):
return ATask(self.processing_date)

def run(self):
print("Executing RecordAStatus Task %s for %s" %(str(os.getpid()), self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)

def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/%s/A_LOOK_UP' %(self.processing_date, 'ASTATUS'))

class FeedMasterTask(luigi.Task):
feed_name = luigi.Parameter()
processing_date = luigi.Parameter()

def requires(self):
return RecordAStatusTask(self.processing_date)

def run(self):
print("Executing FeedMasterTask for %s and for %s" %(self.feed_name, self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)
def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/%s/ALL_DONE' %(self.processing_date, self.feed_name))

class FeedMasterDurTask(luigi.Task):
feed_name = luigi.Parameter()
start_date = luigi.Parameter()
end_date = luigi.Parameter()

def date_range(self, start_date, end_date):

start_date = datetime.strptime(start_date, '%Y%m%d')
end_date = datetime.strptime(end_date, '%Y%m%d')

for rangeIdx in range((end_date - start_date).days + 1):
processing_date = start_date + timedelta(days=rangeIdx)
yield processing_date.strftime('%Y%m%d')

def requires(self):
return [FeedMasterTask(self.feed_name, processing_date) for processing_date in self.date_range(self.start_date, self.end_date)]

def run(self):
print("Finished feed processing from %s to %s" %(self.start_date, self.end_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)

def output(self):
return luigi.LocalTarget(os.getcwd() + "/%s_%s/%s/ALL_DONE" %(self.start_date, self.end_date, self.feed_name))


class ClusterMasterDummyTask(luigi.Task):
feeds = luigi.Parameter()
start_date = luigi.Parameter()
end_date = luigi.Parameter()

def requires(self):
feed_list = self.feeds.strip().split(',')
return [FeedMasterDurTask(feed, self.start_date, self.end_date) for feed in feed_list]

def run(self):
print("Finished CLusterMaster dummy task from %s to %s" %(self.start_date, self.end_date))

if __name__ == '__main__':
luigi.run()

-------------------end of test_feed_master.py

Any feedback is appreciated.

Thanks
Aparna.



Arash Rouhani

unread,
Apr 28, 2015, 1:38:18 AM4/28/15
to apar...@gmail.com, luigi...@googlegroups.com
I did not read through the code at all (it's too long, can you make a minimal testcase? :)). But it's true that luigi does not wait for other tasks to complete. Usually you just cron your job to run every hour.

Please see this issue: https://github.com/spotify/luigi/issues/502, I think it's what you're talking about.






--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages