How to get DateHourParameter value?

741 views
Skip to first unread message

cka...@sessionm.com

unread,
Sep 30, 2016, 11:00:26 AM9/30/16
to Luigi
Hi all

I am using myHour = luigi.DateHourParameter() and want to know how can i get its date time value in the code.

Basically, I need to create a directory path with '%Y/%m/%d%H' based on the dateHour parameter value but when i use strftime it complains it cannot be used on DateHourParameter object and requires a datetime object.

If there is any other way to achive the value out of DateHourParameter object, please let me now

Thanks for looking, charu

Dave Buchfuhrer

unread,
Sep 30, 2016, 11:02:40 AM9/30/16
to cka...@sessionm.com, Luigi
It looks like you're using a DateHourParameter that hasn't been assigned a value. You'll have to show the relevant bits of your code if you want more help than that.




--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

cka...@sessionm.com

unread,
Sep 30, 2016, 11:25:04 AM9/30/16
to Luigi, cka...@sessionm.com
> To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.

>
> For more options, visit https://groups.google.com/d/optout.


Here is the code. I took out some portion of the code that was not relevant to the issue:

class HourlyLoad(SparkSubmitTask):
task_namespace = 'load'
hour = luigi.DateHourParameter() # format is YYYY-MM-DD-HH-MM - changed by Charu
load_type = luigi.Parameter()
reload = luigi.BoolParameter(significant=False, default=False)
wf_dir = '/home/ckapoor/playground/'

def output(self):
hour_dt = datetime.strftime(self.hour, '%Y/%m/%d/%H/%M')
print('Charus Date = ', hour_dt)
path = '{}/{}/{}/{}'.format(wf_dir, self.load_type, hour_dt)

return luigi.LocalTarget(path)


def run(self):
print(self.full_spark_command())
super(HourlyLoad, self).run()
self.output().open('w').close()

def __init__(self, *args, **kwargs):
super(HourlyLoad, self).__init__(*args, **kwargs)
if self.reload:
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
self.output().remove()


if __name__ =="__main__":
luigi.run(['--module', 'charu_loadspark','RangeHourly', '--of', 'load.HourlyLoad', '--start', "2016-09-27T00", '--stop', "2016-09-27T04", '--of-params', '{"load_type" : "rest", "reload" : 1}' ])

Dave Buchfuhrer

unread,
Sep 30, 2016, 11:57:30 AM9/30/16
to Charu Kapoor, Luigi
I ran this code and it does not trigger the problem you're having. I added 

from datetime import datetime
import luigi
from luigi.contrib.spark import SparkSubmitTask

to the top of the file and

luigi.run()

to the bottom and ran it with

python test.py load.HourlyLoad --hour 2016-09-29T05 --load-type five --local

and I get the following traceback:

  File "/Users/buck/.virtualenvs/luigi3/lib/python3.5/site-packages/luigi/worker.py", line 295, in check_complete
    is_complete = task.complete()
  File "/Users/buck/.virtualenvs/luigi3/lib/python3.5/site-packages/luigi/task.py", line 428, in complete
    outputs = flatten(self.output())
  File "test.py", line 16, in output
    path = '{}/{}/{}/{}'.format(self.wf_dir, self.load_type, hour_dt)
IndexError: tuple index out of range

which shows an error after the strftime call. Are you sure this contains the code that triggers your issue?

To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

cka...@sessionm.com

unread,
Sep 30, 2016, 12:26:09 PM9/30/16
to Luigi, cka...@sessionm.com
Thanks, lm looking into it in case i am doing anything different. Also would you know if one can get the value of --stop date in the code. I can certainly get the startDate in the DateHourParameter but not sure about the other.

Dave Buchfuhrer

unread,
Sep 30, 2016, 4:47:21 PM9/30/16
to Charu Kapoor, Luigi

I don't see a start date or stop date anywhere in the code you sent. If you want access to parameters, add them to your class.


To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

Charu Kapoor

unread,
Oct 1, 2016, 1:40:13 AM10/1/16
to Dave Buchfuhrer, Luigi
I tried it but may be missing something. Below is the code below along with the cmd line. lmk if it is possible to get value of 'stop' in the code

python -m luigi --module TestDaily RangeHourly --of MyTask --start 2016-09-02T01 --stop 2016-09-02T04 --of-params '{"x": 123, "y": 100, "reload": 1}'


import luigi

from datetime import datetime, timedelta



class MyTask(luigi.Task):

    # myDate = luigi.DateHourParameter(default='stop')

    start = luigi.DateHourParameter()

    stop = luigi.DateHourParameter()

    x = luigi.IntParameter()

    y = luigi.IntParameter()

    reload = luigi.BoolParameter()

    print ("In class instantiation")


    def run(self):

        print("Running job.. reload= %s", self.reload )

        print(" **my time ***", datetime.strftime(self.start,'%Y/%m/%d/%H/%M'))

        # print(" **my time ***", datetime.strftime(self.stop,'%Y/%m/%d/%H/%M')) # Error here - cannot get this value


        # super(MyTask, self).run()

        self.output().open('w').close()


    def output(self):

        print(self.start)

        return luigi.LocalTarget('charu-%s.txt' % self.start)


    def __init__(self, *args, **kwargs):

        super(MyTask, self).__init__(*args, **kwargs)

        if self.reload:

            outputs = luigi.task.flatten(self.output())

            for out in outputs:

                if out.exists():

                    self.output().remove()

Dave Buchfuhrer

unread,
Oct 1, 2016, 1:57:58 AM10/1/16
to Charu Kapoor, Luigi
When you run this, the RangeHourly will only fill in the first DateHourParameter, start. stop does not get a value. I'm surprised this even ran at all for you. For me, it throws a MissingParameterException because stop is never defined. If you want one instance of MyTask that has access to both start and stop, you'll want to invoke it like

python -m luigi --module TestDaily MyTask --start 2016-09-02T01 --stop 2016-09-02T04 --x 123 --y 100 --reload --local

If you want one instance for every hour, you can use your previous invocation, but you'll have 28 jobs that each know about their own hours but not about start or stop. Start and stop are parameters of RangeHourly the way you were invoking it. If you want 28 jobs that know both about their own hours and about the stop hour, you'll have to implement your own luigi.WrapperTask. This is probably not what you really want, though. If you think it's what you want, please reconsider as you can save yourself a lot of trouble in the not too distant future.
Reply all
Reply to author
Forward
0 new messages