Hooks for before / after task execution?

1,353 views
Skip to first unread message

Samuel Lampa

unread,
Aug 15, 2014, 2:16:06 PM8/15/14
to luigi...@googlegroups.com
Hi,

We are trying to implement a mixin for all our tasks to inherit from, that would log the execution time for each task by saving time stamps before and after the task execution.

By following the events and callbacks section in the API overview (https://github.com/spotify/luigi/blob/master/doc/api_overview.rst#events-and-callbacks), we tested (naively) something like this, to start with:


class TimingMixin():
start_time = None
end_time = None
exec_time = None
 
@luigi.Task.event_handler(luigi.Event.DEPENDENCY_DISCOVERED)
def save_start_time(self):
self.start_time = time.time()
self.log("Dependency present. Starting task")
 
@luigi.Task.event_handler(luigi.Event.SUCCESS)
def save_end_time(self):
self.end_time = time.time()
import pdb; pdb.set_trace()
self.exec_time = time.strftime('%H:%M:%S', time.gmtime(self.end_time - self.start_time))
self.log("Task succeded. Time executed (H:M:S): " + self.exec_time)

It turns out though that niether the events "DEPENDENCY_PRESENT" nor "DEPENDENCY_DISCOVERED" are events that are fired everty time the tasks starts. (The "SUCCESS" event seems to fire every time the task is completed though).

Is there any other way to hook in to before and after a task is executed, so that we can calculate the execution time?

Best Regards
// Samuel

Samuel Lampa

unread,
Aug 26, 2014, 10:04:04 AM8/26/14
to luigi...@googlegroups.com
Never mind!

I see now that event / hooks are added for start / success and execution time, in the latest github:

Cheers
// Samuel

Samuel Lampa

unread,
Aug 26, 2014, 10:09:55 AM8/26/14
to luigi...@googlegroups.com
Btw, any info on when these changes will be released and available in PyPi?

Cheers
// Samuel

Erik Bernhardsson

unread,
Aug 26, 2014, 10:31:28 AM8/26/14
to Samuel Lampa, luigi...@googlegroups.com
I push to PyPI every once in a while – let me do it now


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Erik Bernhardsson
Engineering Manager, Spotify, New York

Erik Bernhardsson

unread,
Aug 26, 2014, 10:33:16 AM8/26/14
to Samuel Lampa, luigi...@googlegroups.com
Done :)

Samuel Lampa

unread,
Aug 26, 2014, 11:20:51 AM8/26/14
to luigi...@googlegroups.com, samuel...@gmail.com
Cool, even managed to print out the execution times now, as documented in this example gist:

Cheers
// Samuel

Samuel Lampa

unread,
Aug 26, 2014, 1:20:30 PM8/26/14
to luigi...@googlegroups.com
Btw, I see now that the execution time is included in the web UI also, for running tasks, but not for finished ones. Would it possible to add it for the finished ones as well?

BR
// Samuel

Anna Yuan

unread,
Aug 31, 2015, 6:14:35 PM8/31/15
to Luigi, samuel...@gmail.com
Very useful tool!!! And by the way, is there a way to return the execution time as an output (string or number) instead of just print it out? Thank you!

Samuel Lampa

unread,
Aug 31, 2015, 6:39:28 PM8/31/15
to Luigi, samuel...@gmail.com
That should be no problem You can do pretty much anything inside a function that you decorate to be an event handler.

For example, we store all the task information to a task-specific file, in our SciLuigi library:
https://github.com/samuell/sciluigi/blob/d403b1/sciluigi/audit.py#L52-L62

We actually don't use normal luigi outputs for this particular thing (maybe we should), but I think it should not be hard... something like this should work:

class MyTask(luigi.Task):
def output(self):
return {'main_output': luigi.LocalTarget('main_output.txt'), # Provided we want to separate the main output, and the log 'timelog': luigi.LocalTarget('time.log')}
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
with self.output()['timelog'].open('w') as timelog_file: timelog_file.write('Processing time: %s' % processing_time)

Have not tested though, but I'm quite sure it will work minus any typos in there :)

Best
// Samuel

Anna

unread,
Aug 31, 2015, 6:54:38 PM8/31/15
to Luigi, samuel...@gmail.com

Thanks for your quick reply!


What if I want to get the execution_time's value in another function by calling your print_execution_time() function? Like following, "return str(processing_time)" is not a good way to solve this problem. : (


@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
    print('### PROCESSING TIME ###: ' + str(processing_time))
    return str(processing_time)


Hope you understand my question...


Best,

Anna

Samuel Lampa

unread,
Aug 31, 2015, 7:28:20 PM8/31/15
to Luigi, samuel...@gmail.com
Aha, I think I maybe understand the question now!

You can not call the function directly from other functions, as far as I know. It will only be called automatically, when an event of type luigi.Event.PROCESSING_TIME is sent to it. The decorator (the line starting with "@") is what changes the behavior of the function, to behave like that.

What you can do is instead to for example store the data in a task variable, and then accessing that, from another function.

So, something like this, where the save_end_time() function executes when the task is finishing, saving the information in the exectime task variable, and the other_function() can be executed whenever you want:

class MyTask(luigi.Task):

  exectime = ''

  @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
  def save_end_time(self, task_exectime_sec):
      # Save the execution time to a task variable:
      self.exectime = task_exectime_sec

  def other_function(self):
      # Here, access the value 
      if self.execimte != '':
          do_something(self.exectime)


The problem is of course that you have to somehow need to make sure that you execute the other function after the first one, or to poll and check whether the value has been stored in that task variable.

There are many other ways to do this though. As mentioned, in SciLuigi, we save data to a file for each task, and then we access this data from those files, from another task.

etc.

Hope this helps!
Best
// Samuel


On Tuesday, September 1, 2015 at 12:54:38 AM UTC+2, Anna wrote:

Thanks for your quick reply!


What if I want to get the execution_time's value in another function by calling your print_execution_time() function? Like following, "return str(processing_time)" is not a good way to solve this problem. : (


@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
    print('### PROCESSING TIME ###: ' + str(processing_time))
    return str(processing_time)


Hope you understand my question...


Best,

Anna




On Monday, August 31, 2015 at 3:39:28 PM UTC-7, Samuel Lampa wrote:
That should be no problem You can do pretty much anything inside a function that you decorate to be an event handler.

For example, we store all the task information to a task-specific file, in our SciLuigi library:
https://github.com/samuell/sciluigi/blob/d403b1/sciluigi/audit.py#L52-L62

We actually don't use normal luigi outputs for this particular thing (maybe we should), but I think it should not be hard... something like this should work:

class MyTask(luigi.Task):
def output(
...

Samuel Lampa

unread,
Aug 31, 2015, 7:31:18 PM8/31/15
to Luigi, samuel...@gmail.com
Actually, I described the problem we had in this other thread that I posted a few days ago:
https://groups.google.com/forum/#!topic/luigi-user/9J_s3UaQQEQ
... and how we solved it, so maybe it is of some use for you!

Best
// Samuel

Anna

unread,
Aug 31, 2015, 7:46:18 PM8/31/15
to Luigi, samuel...@gmail.com
Ya, this answers my question. Thank you!!! 

pilla...@gmail.com

unread,
May 25, 2017, 8:12:39 AM5/25/17
to Luigi
Hi Samuel,

A question please. How do I return worker's name and I'd along with processing time?

Many thanks,
Jianliang

wplea...@groupon.com

unread,
May 26, 2017, 9:05:15 AM5/26/17
to Luigi, pilla...@gmail.com

For processing time, you can use the decorator like below and then define your custom method for what you want to do with it. This example just prints it in seconds. @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME) def processing_time(task, processing_time): print('Processing time for task {0} was {1} seconds'.format(task,processing_time))
You can get the worker id/info if you define run() like this. Note, this is somewhat custom for other reasons, so there may be an easier way.
def run(task, scheduler='127.0.0.1', port=8082): """ Thread safe alternative of luigi.build """ if port_is_open(port):
sch = luigi.rpc.RemoteScheduler(url='http://{0}:{1}'.format(scheduler,str(port))) else: sch = luigi.scheduler.Scheduler() w = luigi.worker.Worker(scheduler=sch, no_install_shutdown_handler=True) # print worker info & run print(w._worker_info) w.add(task) w.run()
Reply all
Reply to author
Forward
0 new messages