Scheduling tasks in order / tying pipelines together in production

296 views
Skip to first unread message

dy...@growthintel.com

unread,
Jul 22, 2016, 12:28:53 PM7/22/16
to Luigi
Hi all,

I'm running into a situation where I've got a few relatively complex pipelines that are normally standalone, but I'd like to be able to tie together in production. Something like this, where Pipeline is a Task representing the entry point to a DAG. 

class MegaPipeline(luigi.WrapperTask):

   
def requires(self):
       
# normal behavior of scheduler would be to run these in any order, but I'd like to
       # ensure that A has run before B.
       
return[PipelineA(), PipelineB()]  

Is there a nice way to do this currently? As far as I can tell, my options are:

- add Pipeline(A) to the requires method of one of the base tasks of Pipeline B (possibly with a boolean parameter to either trigger or not). This does not feel clean to me. 

I'd prefer to just be able to tell the scheduler, "in this case, please execute my requirements in order". 

I hope I'm not missing something obvious here, any and all help much appreciated! 

Dylan 

Dave Buchfuhrer

unread,
Jul 22, 2016, 1:22:31 PM7/22/16
to dy...@growthintel.com, Luigi
The scheduler determines the order of task execution using several parameters. The main three are dependencies, resources, and priority. Dependencies are how you tell the scheduler to run specific tasks in a specific order. If you always want PipelineA to run before TaskB in production, then TaskB should depend on PipelineA in production. If you instead just want PipelineA to run preferentially before you get around to running the tasks in PipelineB, you should give the tasks in PipelineA higher priority than those in PipelineB. If you don't care about the order but only want to ensure that tasks aren't run at the same time, resources are the correct tool.

You can combine resources and priority to ensure that the scheduler chooses PipelineA tasks before PipelineB tasks. Simply give each task in PipelineA a single unit of a resource. Give every task in PipelineB 100 units of the same resource. Give the scheduler a 100-unit limit of this resource in production, and a 1 billion limit of this resource in staging. Also, give every task in PipelineA a higher priority than any task in PipelineB. Now both tasks will run simultaneously in staging but in production you'll run up to 100 PipelineA tasks at the same time until there are no more runnable PipelineA tasks left, then start working on PipelineB tasks.

Note that with the above solution, you will also start working on PipelineB tasks if all PipelineA tasks fail. If you want PipelineA to finish successfully before running PipelineB, that is what a dependency means and you should use a dependency. If this dependency only exists in production, the easiest way to configure it is with a global boolean config parameter that you read during the requires function of all your PipelineB tasks.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dylan Barth

unread,
Jul 22, 2016, 2:12:29 PM7/22/16
to Dave Buchfuhrer, Luigi
Thanks Dave. In our case, we definitely have a dependency relationship, I was just hoping there was a way I could get chain entire pipelines together without having them know about one another. Since these feel like distinct groups of tasks, It doesn't feel great to me to have to add information about PipelineA to PipelineB's requires methods. 

Essentially, you have more than one distinct pipelines / DAGs, and without modifying the existing code and adding global booleans, you want to chain them together/create a dependency relationship. Has anyone else run into this? How are you dealing with it?
--
-- 
Dylan Barth / Software Developer 

Net.Works, 25-27 Horsell Road, Highbury, London, N5 1XL
 


_____________________________________________________________________________

THIS E-MAIL (INCLUDING ANY ATTACHMENTS) IS MEANT ONLY FOR THE INTENDED RECIPIENT. IT MAY ALSO CONTAIN CONFIDENTIAL AND PRIVILEGED INFORMATION. IF YOU ARE NOT THE INTENDED RECIPIENT, ANY RELIANCE ON, USE, DISCLOSURE, DISTRIBUTION OR COPYING OF THIS E-MAIL OR ATTACHMENT IS STRICTLY PROHIBITED. PLEASE NOTIFY THE SENDER IMMEDIATELY BY E-MAIL. IF YOU HAVE RECEIVED THIS MESSAGE BY MISTAKE DELETE THE E-MAIL AND ALL ATTACHMENTS. WHILST THIS EMAIL HAS BEEN CHECKED FOR ALL KNOWN VIRUSES, RECIPIENTS SHOULD UNDERTAKE THEIR OWN VIRUS CHECKING AS PELUCID LIMITED DOES NOT WARRANT THAT THIS EMAIL OR ITS ATTACHMENTS ARE FREE OF COMPUTER VIRUSES OR OTHER HARMFUL DEVICES AND ACCORDINGLY, PELUCID LIMITED SHALL NOT BE LIABLE FOR ANY LOSS OR DAMAGE CAUSED.

Dave Buchfuhrer

unread,
Jul 22, 2016, 2:25:42 PM7/22/16
to Dylan Barth, Luigi
You could always just wait to schedule PipelineB until after PipelineA has finished running successfully. There's no reason to hold PipelineB in the scheduler before you're ready for it if you're that upset about explicit dependencies. But if it's critical that PipelineB not run until after PipelineA, the safest way to ensure that is to make the dependency explicit. If the tasks weren't related, you wouldn't be so worried about ensuring that they run in a specific order. Perhaps if you were more concrete about why you need this dependency in production but not in staging we could be more helpful.

vas

unread,
Jul 23, 2016, 9:18:05 AM7/23/16
to Luigi, dy...@growthintel.com
Hi Dave,

We too have a similar requirement like Dylan mentioned. To give a brief context,  we have 3 pipeline jobs -  pipeline A (file parser - starts at 10AM from cron) , pipeline B (stats analyzer -- depends on A and starts at 11AM from cron) and pipeline C (data archiver -- depends A and starts at 11AM).  For host and user permission constraints, we need to schedule Pipelines A, B anc C on different hosts.  In short,  3 pipelines are scheduled from cron on 3 different hosts. 

We have below 3 cases to be handled for the above kind of dependency.  Could you please let us know the possible ways of solving it?

#1.  In case pipeline B or C run by mistake before A is started, it should not invoke A because A is specified in the require() of B and C.  We could not use ExternalTask because it seems like duplicating worker code for finding Task status.  

#2. If pipelineA is not complete, we want to make the pipeline B and C wait for some time and retry for some times.  Since worker-max-reschedules could not be specified at Task level, it would be helpful if you can guide us to solve this.

#3. If pipelineA is rerun, we want to rerun its dependent pipelines B and C. It is like running dependent or downstream jobs from base job. Is there any utility like deps.py  or RESTful service to find both (upstream and downstream) dependencies configured in different modules?  

Thanks!

playsted

unread,
Jul 23, 2016, 11:30:54 AM7/23/16
to Luigi, dy...@growthintel.com
Hi
> #1.  In case pipeline B or C run by mistake before A is started, it should not invoke A because A is specified in the require() of B and C.  We could not use ExternalTask because it seems like duplicating worker code for finding Task status.  

Can you expand on why you can't use ExternalTask? This seems like the ideal solution for your problem.

> #2. If pipelineA is not complete, we want to make the pipeline B and C wait for some time and retry for some times.  Since worker-max-reschedules could not be specified at Task level, it would be helpful if you can guide us to solve this.

From what I know, the luigi way of doing this is to simply have the pipeline scheduled every X minutes for a peroid after 10:00am. If the pipeline is finished already the reruns are very "cheap" (they only check for the output of the last task)

> #3. If pipelineA is rerun, we want to rerun its dependent pipelines B and C. It is like running dependent or downstream jobs from base job. Is there any utility like deps.py  or RESTful service to find both (upstream and downstream) dependencies configured in different modules?  

luigi isn't very good about reruns (as is). The solution I'm currently to use a script that you can input a task into. It removes that tasks output and all of the tasks "children", so that when scheduled next, luigi will rerun from that task and all tasks down the tree that were dependent on it. This is something that is important for me to be able to do to the point where every target we use MUST have a remove() method. This allows the us to easily automate reruns from certain stages.

In addition I'm toying with the idea of requiring that any task that is not idempotent to include a rollback() method or similar, essentially enabling the "transaction" to be reversed for a rerun.

Dave Buchfuhrer

unread,
Jul 23, 2016, 11:51:51 AM7/23/16
to playsted, Luigi, Dylan Barth
We could not use ExternalTask because it seems like duplicating worker code for finding Task status.

You can call luigi.task.externalize if you want to use the same code as your non-external task. Or you can refactor so the ExternalTask can call the same function.


Reply all
Reply to author
Forward
0 new messages