I am using myHour = luigi.DateHourParameter() and want to know how can i get its date time value in the code.
Basically, I need to create a directory path with '%Y/%m/%d%H' based on the dateHour parameter value but when i use strftime it complains it cannot be used on DateHourParameter object and requires a datetime object.
If there is any other way to achive the value out of DateHourParameter object, please let me now
Thanks for looking, charu
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Here is the code. I took out some portion of the code that was not relevant to the issue:
class HourlyLoad(SparkSubmitTask):
task_namespace = 'load'
hour = luigi.DateHourParameter() # format is YYYY-MM-DD-HH-MM - changed by Charu
load_type = luigi.Parameter()
reload = luigi.BoolParameter(significant=False, default=False)
wf_dir = '/home/ckapoor/playground/'
def output(self):
hour_dt = datetime.strftime(self.hour, '%Y/%m/%d/%H/%M')
print('Charus Date = ', hour_dt)
path = '{}/{}/{}/{}'.format(wf_dir, self.load_type, hour_dt)
return luigi.LocalTarget(path)
def run(self):
print(self.full_spark_command())
super(HourlyLoad, self).run()
self.output().open('w').close()
def __init__(self, *args, **kwargs):
super(HourlyLoad, self).__init__(*args, **kwargs)
if self.reload:
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
self.output().remove()
if __name__ =="__main__":
luigi.run(['--module', 'charu_loadspark','RangeHourly', '--of', 'load.HourlyLoad', '--start', "2016-09-27T00", '--stop', "2016-09-27T04", '--of-params', '{"load_type" : "rest", "reload" : 1}' ])
from datetime import datetimeimport luigifrom luigi.contrib.spark import SparkSubmitTask
luigi.run()
python test.py load.HourlyLoad --hour 2016-09-29T05 --load-type five --local
File "/Users/buck/.virtualenvs/luigi3/lib/python3.5/site-packages/luigi/worker.py", line 295, in check_completeis_complete = task.complete()File "/Users/buck/.virtualenvs/luigi3/lib/python3.5/site-packages/luigi/task.py", line 428, in completeoutputs = flatten(self.output())File "test.py", line 16, in outputpath = '{}/{}/{}/{}'.format(self.wf_dir, self.load_type, hour_dt)IndexError: tuple index out of range
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
I don't see a start date or stop date anywhere in the code you sent. If you want access to parameters, add them to your class.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
python -m luigi --module TestDaily RangeHourly --of MyTask --start 2016-09-02T01 --stop 2016-09-02T04 --of-params '{"x": 123, "y": 100, "reload": 1}'
import luigi
from datetime import datetime, timedelta
class MyTask(luigi.Task):
# myDate = luigi.DateHourParameter(default='stop')
start = luigi.DateHourParameter()
stop = luigi.DateHourParameter()
x = luigi.IntParameter()
y = luigi.IntParameter()
reload = luigi.BoolParameter()
print ("In class instantiation")
def run(self):
print("Running job.. reload= %s", self.reload )
print(" **my time ***", datetime.strftime(self.start,'%Y/%m/%d/%H/%M'))
# print(" **my time ***", datetime.strftime(self.stop,'%Y/%m/%d/%H/%M')) # Error here - cannot get this value
# super(MyTask, self).run()
self.output().open('w').close()
def output(self):
print(self.start)
return luigi.LocalTarget('charu-%s.txt' % self.start)
def __init__(self, *args, **kwargs):
super(MyTask, self).__init__(*args, **kwargs)
if self.reload:
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
self.output().remove()
python -m luigi --module TestDaily MyTask --start 2016-09-02T01 --stop 2016-09-02T04 --x 123 --y 100 --reload --local