I'm having some trouble getting the Luigi scheduler to work as expected. I'm running multiple workers that work off of the same queue. Each task is a luigi task tree with ~40 nodes. We are using Postgres for output targets. When two workers start working on the same task tree, I sometimes see the same task being run twice in serial by different workers. My understanding of Luigi is that the each task will only be run if task.output().exists() is False.
The last line of each of our Luigi tasks is task.output().touch(). Since the Postgres Target uses autocommit, these changes should be committed immediately, and any subsequent task should be able to see that the row exists. However, we occasionally see an IntegrityError when the task runs for a second time and tries to write its output target again.
Here some (slightly redacted) luigi logging.
Scheduled 11 tasks of which:
* 5 present dependencies were encountered:
- 1 TaskE(id=8, subtask_id=23)
- 1 TaskF(id=8, subtask_id=41)
- 1 TaskG(id=8, subtask_id=23)
- 1 TaskH(id=8, subtask_id=41)
- 1 StrategyCampaignsInitTask(id=8)
* 4 ran successfully:
- 1 TaskD(id=8, subtask_id=41)
- 1 RepeatedTask(id=8, subtask_id=41)
- 1 TaskC(id=8, subtask_id=41)
- 1 TaskB(id=8, subtask_id=41)
* 2 were left pending, among these:
* 1 were missing external dependencies:
- 1 RootTask(id=8)
* 2 were being run by another worker:
- 1 RepeatedTask(id=8, subtask_id=41)
- 1 RootTask(id=8)
Notice "RepeatedTask(id=8, subtask_id=41)" is in both the "ran successfully" and "being run by another worker" sections.
Any ideas why this might be? Do I have a fundamental misunderstanding of how Luigi is supposed to work?
Happy to provide more info if needed.
Thanks for the help in advance!
Wesley
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
Another thing. Running the same task shouldn't be a problem unless it happens wastefully often. But if it happens quite often it shows that something is wrong, and we don't really know how luigi is operating.
Good luck debugging!
Cheers,
Arash
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+unsubscribe@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
ERROR: [pid ...] Worker Worker(...) failed Task(...)
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/luigi/worker.py", line 205, in run
new_deps = self._run_get_new_deps()
File "/opt/conda/lib/python3.6/site-packages/luigi/worker.py", line 142, in _run_get_new_deps
task_gen = self.task.run()
File "/opt/conda/lib/python3.6/site-packages/luigi/contrib/postgres.py", line 338, in run
self.output().touch(connection)
File "/opt/conda/lib/python3.6/site-packages/luigi/contrib/postgres.py", line 156, in touch
(self.update_id, self.table))
psycopg2.IntegrityError: duplicate key value violates unique constraint "table_updates_pkey"
DETAIL: Key (update_id)=(Task....) already exists.