apar...@gmail.com
unread,Apr 27, 2015, 4:33:34 PM4/27/15Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to luigi...@googlegroups.com
Hi,
I am like 3 weeks old using luigi.
In one of our projects, we are trying to leverage luigi to implement our process workflow.
In my workflow, there are n tasks depending on 1 common task.
These n tasks run in python subprocesses. Only One of subprocesses is picking up to execute 1 common task (which is good). Unfortunately other subprocess which depend on this task , detect this task is scheduled on different worker and get stopped, without waiting for its completion.
I am attaching the simulated code snippets here. Please do let me know, if this is bug or expected behavior.
This is file - call_feed_master.py
~
import test_feed_master
import multiprocessing as mp
import sys
import os
import logging
def run_cmd(cmd, success_msg, error_msg):
print 'Command to run: ', cmd
try:
status = os.system(cmd)
if status == 0:
print(success_msg)
return 0
else:
print(error_msg)
return 2000
except Exception:
print(error_msg)
return 2000
def worker(feed_name, start_date, end_date, output_queue):
run_feed_master_dur_cmd = "python test_feed_master.py FeedMasterDurTask --feed-name %(feed_name)s --start-date %(start_date)s --end-date %(end_date)s " %locals()
status = run_cmd(run_feed_master_dur_cmd, "Ran feed master dur task", "Unable to run feed master dur task")
output_queue.put(status)
if __name__ == '__main__':
start_date = sys.argv[1]
end_date = sys.argv[2]
output_queue = mp.Queue()
feeds_list = ['ab', 'cd', 'ef' , 'gh']
processes = [mp.Process(target= worker, args=(feed_name, start_date, end_date, output_queue)) for feed_name in feeds_list]
for p in processes:
p.start()
for p in processes:
p.join()
sys.exit(0)
~
test_feed_master.py
import luigi
from datetime import datetime, timedelta
import os
import time
class ATask(luigi.Task):
processing_date = luigi.Parameter()
def run(self):
for i in range(120):
time.sleep(1)
print("Executing ATask %s for %s" %(str(os.getpid()), self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)
def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/A_TABLES/Generate' %self.processing_date)
class RecordAStatusTask(luigi.Task):
processing_date = luigi.Parameter()
def requires(self):
return ATask(self.processing_date)
def run(self):
print("Executing RecordAStatus Task %s for %s" %(str(os.getpid()), self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)
def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/%s/A_LOOK_UP' %(self.processing_date, 'ASTATUS'))
class FeedMasterTask(luigi.Task):
feed_name = luigi.Parameter()
processing_date = luigi.Parameter()
def requires(self):
return RecordAStatusTask(self.processing_date)
def run(self):
print("Executing FeedMasterTask for %s and for %s" %(self.feed_name, self.processing_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)
def output(self):
return luigi.LocalTarget(os.getcwd() + '/%s/%s/ALL_DONE' %(self.processing_date, self.feed_name))
class FeedMasterDurTask(luigi.Task):
feed_name = luigi.Parameter()
start_date = luigi.Parameter()
end_date = luigi.Parameter()
def date_range(self, start_date, end_date):
start_date = datetime.strptime(start_date, '%Y%m%d')
end_date = datetime.strptime(end_date, '%Y%m%d')
for rangeIdx in range((end_date - start_date).days + 1):
processing_date = start_date + timedelta(days=rangeIdx)
yield processing_date.strftime('%Y%m%d')
def requires(self):
return [FeedMasterTask(self.feed_name, processing_date) for processing_date in self.date_range(self.start_date, self.end_date)]
def run(self):
print("Finished feed processing from %s to %s" %(self.start_date, self.end_date))
if not os.path.exists(self.output().path):
os.makedirs(self.output().path)
def output(self):
return luigi.LocalTarget(os.getcwd() + "/%s_%s/%s/ALL_DONE" %(self.start_date, self.end_date, self.feed_name))
class ClusterMasterDummyTask(luigi.Task):
feeds = luigi.Parameter()
start_date = luigi.Parameter()
end_date = luigi.Parameter()
def requires(self):
feed_list = self.feeds.strip().split(',')
return [FeedMasterDurTask(feed, self.start_date, self.end_date) for feed in feed_list]
def run(self):
print("Finished CLusterMaster dummy task from %s to %s" %(self.start_date, self.end_date))
if __name__ == '__main__':
luigi.run()
-------------------end of test_feed_master.py
Any feedback is appreciated.
Thanks
Aparna.