Run luigi without multiprocessing?

698 views
Skip to first unread message

Dan Garthwaite

unread,
Nov 9, 2015, 8:58:12 AM11/9/15
to Luigi
Is there a way to tell luigi to run single threaded?

I'm about to rip luigi out of a csv -> database loader I've been working on.  No fault of luigi but the plan was to run this periodically via AWS Lambda and it apparently doesn't mount /dev/shm which is a requirement for multiprocessing.

  -dan

Uldis Barbans

unread,
Nov 9, 2015, 9:28:56 AM11/9/15
to Dan Garthwaite, Luigi
I think if you don't specify --workers then it doesn't use multiprocessing (is single threaded)... I haven't tested if it still requires /dev/shm then; if it does then making it not to would be a good PR.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Uldis "ulzha" Barbans - Software Engineer, Data Infrastructure - Spotify AB

Dan Garthwaite

unread,
Nov 9, 2015, 9:41:42 AM11/9/15
to Luigi, d...@garthwaite.org
I'm not specifying workers.  I tried parallel-scheduling = False and that didn't work, either.

I'm leveraging luigi for resume-ability and dynamic dependencies - but I only ever need one Task running at any time.

Dan Garthwaite

unread,
Nov 9, 2015, 10:15:53 AM11/9/15
to Luigi
This is easy to reproduce - just umount /dev/shm.

Presumably multiprocessing is supposed to be disabled by default via the parallel_scheduling option:
https://luigi.readthedocs.org/en/stable/_modules/luigi/interface.html#core

I'm trying to step through execution to see where to add a check for parallel_scheduling.

Erik Bernhardsson

unread,
Nov 9, 2015, 10:58:58 AM11/9/15
to Dan Garthwaite, Luigi
Luigi is single threaded / single process by default.

Not sure why you are seeing multiple processes – what is the error? I know multiprocessing does some stuff in the global scope so maybe the problem is just the import

--

Dan Garthwaite

unread,
Nov 9, 2015, 11:16:49 AM11/9/15
to Erik Bernhardsson, Luigi
The assumption of multiprocessing.Queue() being available seems to go back to the second tag on github:
https://github.com/spotify/luigi/blob/v1.0.18/luigi/worker.py

I've been playing with overriding DequeQueue's get and put methods to ignore timeout= with no luck.

Here is the traceback. 

Traceback (most recent call last):
  File "etl.py", line 231, in <module>
    start(events, None)
  File "etl.py", line 223, in start
    luigi.run(main_task_cls=Ingest, cmdline_args=args)
  File "vendor/luigi/interface.py", line 207, in run
    return _run(*args, **kwargs)['success']
  File "vendor/luigi/interface.py", line 235, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "vendor/luigi/interface.py", line 185, in _schedule_and_run
    scheduler=sch, worker_processes=env_params.workers, assistant=env_params.assistant)
  File "vendor/luigi/interface.py", line 138, in create_worker
    scheduler=scheduler, worker_processes=worker_processes, assistant=assistant)
  File "vendor/luigi/worker.py", line 371, in __init__
    self._task_result_queue = multiprocessing.Queue()
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 218, in Queue
    return Queue(maxsize)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 63, in __init__
    self._rlock = Lock()
  File "/usr/lib/python2.7/multiprocessing/synchronize.py", line 147, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1)
  File "/usr/lib/python2.7/multiprocessing/synchronize.py", line 75, in __init__
    sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 13] Permission denied

Dan Garthwaite

unread,
Nov 9, 2015, 11:34:19 AM11/9/15
to Erik Bernhardsson, Luigi
Reproduction of the issue:

$ sudo umount /dev/shm

$ python -c "
import luigi
class HelloWorld(luigi.Task):
    def run(self):
        with self.output().open('w') as out:
            out.write('Hello world\n')

    def output(self):
        return luigi.LocalTarget('/tmp/hello_world.txt')

if __name__ == '__main__':
    luigi.run(main_task_cls=HelloWorld)

"
Traceback (most recent call last):
  File "<string>", line 11, in <module>
  File "/vagrant/lambda/ee_etl/vendor/luigi/interface.py", line 207, in run
    return _run(*args, **kwargs)['success']
  File "/vagrant/lambda/ee_etl/vendor/luigi/interface.py", line 235, in _run

    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "/vagrant/lambda/ee_etl/vendor/luigi/interface.py", line 185, in _schedule_and_run
    scheduler=sch, worker_processes=env_params.workers, assistant=env_params.assistant)
  File "/vagrant/lambda/ee_etl/vendor/luigi/interface.py", line 138, in create_worker
    scheduler=scheduler, worker_processes=worker_processes, assistant=assistant)
  File "/vagrant/lambda/ee_etl/vendor/luigi/worker.py", line 369, in __init__

Erik Bernhardsson

unread,
Nov 9, 2015, 11:47:11 AM11/9/15
to Dan Garthwaite, Luigi
Sure, we could probably swap out multiprocessing.Queue for some other queue (as you mentioned) if the number of workers is 1.

I'm pretty sure there's no forking with only 1 worker but I could be wrong.


Dan Garthwaite

unread,
Nov 9, 2015, 5:18:10 PM11/9/15
to Luigi, d...@garthwaite.org
Reply all
Reply to author
Forward
0 new messages