Getting Luigi Tasks to Run Concurrently

2,658 views
Skip to first unread message

banvill...@gmail.com

unread,
Feb 7, 2017, 1:52:20 PM2/7/17
to Luigi

HI

I am new to Luigi and trying to find my way around. I may be missing something on this but I haven't found the answer yet.

Problem:

Luigi seems to be running one task at a time even though many are pending and have no dependencies.

I am running "luigid" and there is an example below with code and command line.

Questions:

* How do I get multiple tasks to run concurrently (assuming not dependencies or dependencies are satisfied) ? In this case, an "unlimited" number of tasks (ok, a lot) ?

* How do I limit the number of concurrent tasks ?


Thanks in advance for any help.

Here is example code which can launch a number of concurrent sleeping tasks to help illustrate what is happening.

Command Line: (luigid is already running)

python concurrent_task_test.py task_launch_sleepers --sleep-time 40 --sleeper-count 4

import luigi
import time

class task_sleeping(luigi.Task):
sleep_time = luigi.IntParameter()
task_number = luigi.IntParameter()

def requires(self):
return []

def output(self):
return luigi.LocalTarget("done_sleeping_" + str(self.task_number) + ".txt")

def run(self):
time.sleep(self.sleep_time)
with self.output().open('w') as ofh:
ofh.write("Done sleeping.")

class task_launch_sleepers(luigi.Task):
sleep_time = luigi.IntParameter()
sleeper_count = luigi.IntParameter()

def requires(self):
sleeper_list = []
for sidx in range(self.sleeper_count):
sleeper_list.append(
task_sleeping(sleep_time = self.sleep_time,
task_number = sidx))
return sleeper_list

def output(self):
return luigi.LocalTarget("done_task_launch_sleepers.txt")

def run(self):
pass

if __name__ == '__main__':
luigi.run()

banvill...@gmail.com

unread,
Feb 7, 2017, 8:58:53 PM2/7/17
to Luigi, banvill...@gmail.com

I did find I could use "--workers 30" to set the number of concurrent workers to 30, as in:

python concurrent_task_test.py task_launch_sleepers --sleep-time 40 --sleeper-count 4 --workers 30

* Is there a way to launch "luigid" with a default setting ? Unlimited or limited ?
* Can certain task types have a smaller limit (e.g. the max may be 30 in general but for a specified task type set it to 5) ?

Thanks.

Dave Buchfuhrer

unread,
Feb 7, 2017, 10:31:27 PM2/7/17
to banvill...@gmail.com, Luigi
You can set default parameter values as described in http://luigi.readthedocs.io/en/stable/configuration.html. You can limit how many of a specific type of task can run at the same time by setting resources in the task and setting a resource limit in the scheduler.


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

banvill...@gmail.com

unread,
Feb 8, 2017, 2:05:13 PM2/8/17
to Luigi, banvill...@gmail.com
> To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.

Thanks for your response.

I had looked at the document you provided the link to. I see that it talks about behavior around workers but it didn't appear to discuss how to set the number of workers; this includes the number of workers per task type. That being said, I will try a few things to see what works, for example:

[resources]
workers=30

The above may work but I have to try it out. I am not clear yet on the specifics of limiting by Task.

When I do figure this out, it would be good if I can contribute an example to the docs to provide an example how to do this.

Chris Palmer

unread,
Feb 8, 2017, 2:58:13 PM2/8/17
to banvill...@gmail.com, Luigi
I find there is some clarity lacking with regards to workers in Luigi. The way I've understood it is that there are Workers (with a capital W) which are the individual invocations of the luigi command line. These connect to the central scheduler and they are what you see one the Workers tab in the scheduler web interface.

Each Worker can then have any number of workers (lower case w) which are the individual processes spun up by the Worker to run tasks. I don't believe the scheduler has any restrictions on how many Workers can connect to it at a given time, nor does it restrict how many workers each Worker has. Since the task execution typically gets done in the environment of the Worker, it's left up to them to know what system resources are available and therefore how many processes can be run concurrently.

I think you can specify the number of workers in the configuration files for your Worker, but we usually just specify it on the command line with '--workers 5'.


On the other hand resources are used by the central scheduler to control task concurrency across multiple Workers. You can define the resources used by a given task by adding a 'resources' attribute to you class. For example:

```
import luigi

class TaskWithResources(luigi.Task):

    start_date = luigi.DateParameter()

    resources = {
        'foobar': 1,
        'bizbaz': 2,
    }

```

That tells the scheduler that instances of the TaskWithResources class require 1 foobar resource and 2 bizbaz resources in order to run.

When a Worker asks the scheduler for a task to run, the scheduler will only give it an instance of TaskWithResources if the necessary resources are available.

You can specify the total available resources in the configuration for you scheduler. For example:

```
[resources]
foobar: 3
bizbaz: 4
```

In that case then only 2 instances of TaskWithResources would be given out by the scheduler across all Workers connected to the scheduler. If there was some other task class that only required a single foobar resource then one instance of that class would also be given out by the scheduler.

Also worth noting that if a task class defines a resource requirement that isn't expressly defined in the scheduler configuration, the the scheduler assumes there is a single resource available.


Exact what combination of resource constraints and limits on workers (lower case w) you want to use will depend on why you need those restrictions.

Hope that helps you figure out your situation.

Chris

To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

banvill...@gmail.com

unread,
Feb 8, 2017, 4:22:35 PM2/8/17
to Luigi, banvill...@gmail.com
Thanks for your response, that really helps clear some things up! I will try playing around with this to confirm it works the way I expect.

Reply all
Reply to author
Forward
0 new messages