Luigi Tasks Running Multiple Times

1,082 views
Skip to first unread message

wesle...@adroll.com

unread,
Nov 4, 2016, 5:11:59 PM11/4/16
to Luigi
Hi,

I'm having some trouble getting the Luigi scheduler to work as expected. I'm running multiple workers that work off of the same queue. Each task is a luigi task tree with ~40 nodes. We are using Postgres for output targets. When two workers start working on the same task tree, I sometimes see the same task being run twice in serial by different workers. My understanding of Luigi is that the each task will only be run if task.output().exists() is False.

The last line of each of our Luigi tasks is task.output().touch(). Since the Postgres Target uses autocommit, these changes should be committed immediately, and any subsequent task should be able to see that the row exists. However, we occasionally see an IntegrityError when the task runs for a second time and tries to write its output target again.

Here some (slightly redacted) luigi logging.

Scheduled 11 tasks of which:
* 5 present dependencies were encountered:
- 1 TaskE(id=8, subtask_id=23)
- 1 TaskF(id=8, subtask_id=41)
- 1 TaskG(id=8, subtask_id=23)
- 1 TaskH(id=8, subtask_id=41)
- 1 StrategyCampaignsInitTask(id=8)
* 4 ran successfully:
- 1 TaskD(id=8, subtask_id=41)
- 1 RepeatedTask(id=8, subtask_id=41)
- 1 TaskC(id=8, subtask_id=41)
- 1 TaskB(id=8, subtask_id=41)
* 2 were left pending, among these:
* 1 were missing external dependencies:
- 1 RootTask(id=8)
* 2 were being run by another worker:
- 1 RepeatedTask(id=8, subtask_id=41)
- 1 RootTask(id=8)

Notice "RepeatedTask(id=8, subtask_id=41)" is in both the "ran successfully" and "being run by another worker" sections.

Any ideas why this might be? Do I have a fundamental misunderstanding of how Luigi is supposed to work?

Happy to provide more info if needed.

Thanks for the help in advance!

Wesley

Arash Rouhani Kalleh

unread,
Nov 6, 2016, 9:18:28 PM11/6/16
to wesle...@adroll.com, Luigi
Hi Wesley,

Interesting debugging. Can you verify that the case isn't something like this?

1. Worker X starts task MyTask()
2. Worker X comes quite far on MyTask(), even does commit and now MyTask().output.exists() == True
3. MyTask() run within Worker X crashes (for example raises IntegrityError or whatever). Hence marking MyTask() as FAILED
4. The central scheduler happily assigns MyTask() to any other Worker Y, as indeed, MyTask() is not DONE.

Cheers,
Arash


--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Wesley Chow

unread,
Nov 7, 2016, 2:51:20 PM11/7/16
to Arash Rouhani Kalleh, Luigi
Hi Arash,

Thanks for the response.

I can confirm the task runs successfully, and yet the scheduler seems to allow it to be run again. (I bolded a few lines which I think provides evidence of this.) In fact, these are logs from a single worker, although I'm seeing the same integrity error on multiple other workers.

INFO: [pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) done      TaskA(strategy_id=83, strategy_campaign_id=158)
INFO:luigi-interface:[pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) done      TaskA(strategy_id=83, strategy_campaign_id=158)
DEBUG:luigi-interface:1 running tasks, waiting for next task to finish
INFO:luigi-interface:Informed scheduler that task   TaskA_158_83_d7ac324e85   has status   DONE
DEBUG:luigi-interface:Asking scheduler for work...
DEBUG:luigi-interface:Pending tasks: 5
INFO:luigi-interface:[pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) running   TaskB(strategy_id=83, strategy_campaign_id=158)
INFO:luigi-interface:[pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) done      TaskB(strategy_id=83, strategy_campaign_id=158)
DEBUG:luigi-interface:1 running tasks, waiting for next task to finish
INFO:luigi-interface:Informed scheduler that task   TaskB_158_83_d7ac324e85   has status   DONE
DEBUG:luigi-interface:Asking scheduler for work...
DEBUG:luigi-interface:Pending tasks: 5
INFO:luigi-interface:[pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) running   TaskA(strategy_id=83, strategy_campaign_id=158)
ERROR: [pid 1] Worker Worker(salt=118275973, workers=1, host=create_strategies7, username=root, pid=1) failed    TaskA(strategy_id=83, strategy_campaign_id=158)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 182, in run
    new_deps = self._run_get_new_deps()
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 120, in _run_get_new_deps
    task_gen = self.task.run()
  File "/home/docker/code/tasks/luigi/create_strategies/facebook.py", line 28, in run
    luigi_write_output_to_target(self, str(self.strategy_campaign_id))
  File "/home/docker/code/tasks/luigi/utils.py", line 34, in luigi_write_output_to_target

  File "/usr/local/lib/python2.7/dist-packages/luigi/postgres.py", line 156, in touch
    (self.update_id, self.table))
IntegrityError: duplicate key value violates unique constraint "luigi_targets_pkey"
DETAIL:  Key (update_id)=(83_158_task_a) already exists.

-Wesley

To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.

Arash Rouhani Kalleh

unread,
Nov 7, 2016, 8:58:33 PM11/7/16
to Wesley Chow, Luigi
Hmm. Other exploitations that come to my mind is:

  • Some bug in luigi
  • Other worker checks it's completeness and that check returns false
  • There's a race condition too, the worker in the above scenerio could check just before the task is complete. Like if you do a looong sleep just before returning in your complete() check, then this would happen often. But in normal scenarios this should not be common.
  • Scheduler hiccups, like restarts or something. More realistically it could be that there are exceptions being thrown from the scheduler, can you just do a quick scan of the logs for exceptions?

Another thing. Running the same task shouldn't be a problem unless it happens wastefully often. But if it happens quite often it shows that something is wrong, and we don't really know how luigi is operating.

Good luck debugging!

Cheers,

Arash


To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.

Wesley Chow

unread,
Nov 8, 2016, 2:19:27 PM11/8/16
to Arash Rouhani Kalleh, Luigi
Ah, that's unfortunate.

I'll try to do some debugging myself, but I'm a bit swamped :(

I'll update the group if/when I find something

Thanks for the help!

To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.

William Grisaitis

unread,
Aug 7, 2018, 2:31:53 PM8/7/18
to Luigi
Sorry to resurrect this but I'm having the same issue: the same CopyToTable task is being assigned to two workers near-simultaneously. This then causes the error the OP describes: 

ERROR: [pid ...] Worker Worker(...) failed    Task(...)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.6/site-packages/luigi/worker.py", line 205, in run

    new_deps = self._run_get_new_deps()

  File "/opt/conda/lib/python3.6/site-packages/luigi/worker.py", line 142, in _run_get_new_deps

    task_gen = self.task.run()

  File "/opt/conda/lib/python3.6/site-packages/luigi/contrib/postgres.py", line 338, in run

    self.output().touch(connection)

  File "/opt/conda/lib/python3.6/site-packages/luigi/contrib/postgres.py", line 156, in touch

    (self.update_id, self.table))

psycopg2.IntegrityError: duplicate key value violates unique constraint "table_updates_pkey"

DETAIL:  Key (update_id)=(Task....) already exists.



Why is the scheduler assigning the same task to more than one worker? 
Reply all
Reply to author
Forward
0 new messages