Easy way to turn *off* certain jobs?

79 views
Skip to first unread message

Jake Biesinger

unread,
Feb 11, 2011, 1:42:49 PM2/11/11
to ruffus_...@googlegroups.com
Hi!

In developing a pipeline for Chip-Seq analysis using Ruffus, I've run into a bit of a snag.  I'd like to be able to manually turn off some portions of the pipeline.  The setup is something like:

     MAP READS
     /                   \
   /                       \
MACS                 GLITR
   \                        /
     \                    /
        Visualize, GO analysis, etc

My decorator signature for these functions is something like:

@jobs_limit(config.max_throttled_jobs, 'throttled')
@collate(bowtie_to_bed, regex(r'(.+)\.treat\.mapped_reads'), 
         add_inputs(r'\1.control.mapped_reads'), r'\1.treat.glitr.ranges')
def run_glitr(in_files, out_peaks):
    # RUN GLITR PEAK CALLER

@jobs_limit(config.max_throttled_jobs, 'throttled')
@collate(bowtie_to_bed, regex(r'(.+)\.treat\.mapped_reads'), 
         add_inputs(r'\1.control.mapped_reads'), r'\1.treat.macs.peaks')
def run_macs(in_files, out_peaks):
   # RUN MACS PEAK CALLER

The problem is that it's not always appropriate to run the GLITR peak caller.  Without commenting out chunks of the pipeline, how would I turn off the GLITR peak caller and exclude its output from the downstream steps that are held in common with the MACS peak caller?  It would be nice if I could say:

@conditional(config.run_GLITR)  # either a bool or a function returning a bool
...
def run_glitr(in_files, out_peaks):
    # RUN GLITR PEAK CALLER


The other way I can conceive of achieving this would be to have some strange variable regex in the collate mapping, that is:

if config.run_GLITR:
    glitr_regex = r'(.+)\.treat\.mapped_reads'
else:
    glitr_regex = 'THIS WILL NEVER MATCH ANYTHING'
@collate(bowtie_to_bed, regex(glitr_regex), 
         add_inputs(r'\1.control.mapped_reads'), r'\1.treat.macs.peaks')
def run_macs(in_files, out_peaks):
   # RUN MACS PEAK CALLER

Any advice on this?  Am I missing some Ruffus magic that would handle this for me?

Leo Goodstadt

unread,
Feb 12, 2011, 8:11:40 PM2/12/11
to ruffus_...@googlegroups.com
The way to turn it off would be to remove its downstream dependencies, I suppose. Would that make sense?
I could add an extra keyword @inactive or @active, but only if it would be of general use, and I can't see that at the moment. 
What about a pipeline_remove_task function?
Leo

--
Dr. Leo Goodstadt
University of Oxford
United Kingdom

Jacob Biesinger

unread,
Feb 12, 2011, 10:59:47 PM2/12/11
to ruffus_...@googlegroups.com
On Sat, Feb 12, 2011 at 5:11 PM, Leo Goodstadt <bunb...@gmail.com> wrote:
The way to turn it off would be to remove its downstream dependencies, I suppose. Would that make sense?
I could add an extra keyword @inactive or @active, but only if it would be of general use, and I can't see that at the moment. 

Since the downstream dependencies are shared between active and inactive steps, removing the downstream parts isn't an option.
     MAP READS
     /                   \
   /                       \
MACS                 GLITR  # Turn off GLITR, but keep MACS going along with all downstream targets
   \                        /
     \                    /
        Visualize, GO analysis, etc

 I'd like the pipeline step to be *conditionally active*, that is, without modification to the pipeline code, I'd like to turn on/off certain parts. The regex("NOT A MATCH") seems to be the best option, but only works for @collate functions.  

What about a pipeline_remove_task function?

This would be okay, though I think that having the condition close to the function would make it more readable: 

@conditional_run(config.run_GLITR)  # either a bool or a function returning a bool
@split(...)
def do_stuff(...)
 

Leo Goodstadt

unread,
Feb 13, 2011, 6:04:48 AM2/13/11
to ruffus_...@googlegroups.com
The @active or @inactive keyword was intended exactly in the sense of @conditional_run.
The major point about adding decorators isn't the changes required but making sure that
the number of decorators don't spiral out of control: keywords are a precious resource!

How about

1) Let me ask around to see if anybody objects to have some such facility, or have other opinions on it,
or perhaps are desperate for such a feature.


2) We try to come up with a really good name. Something more immediately obvious than 
@conditional_run (This is my subjective judgement: I am open to persuasion.)
Suggestions include
  @active_if 
  @include_task_if
  @run_if

3) If we implement this, let us not export this keyword automatically. So you would
need to write either

from ruffus import conditional_run

or 

import ruffus
rf = ruffus
@rf.conditional_run(config.some_boolean_func())
@split(...)
def do_stuff(...)

This greatly reduces the cost of the changes.
What do you think?

Leo 

Jacob Biesinger

unread,
Feb 13, 2011, 12:45:40 PM2/13/11
to ruffus_...@googlegroups.com
On Sun, Feb 13, 2011 at 3:04 AM, Leo Goodstadt <bunb...@gmail.com> wrote:
The @active or @inactive keyword was intended exactly in the sense of @conditional_run.
The major point about adding decorators isn't the changes required but making sure that
the number of decorators don't spiral out of control: keywords are a precious resource!

Yes, but having two keywords would force me to actually change the pipeline code itself, which to me is more costly than updating a config file for a particular dataset.
 
How about

1) Let me ask around to see if anybody objects to have some such facility, or have other opinions on it,
or perhaps are desperate for such a feature.

Of course!

2) We try to come up with a really good name. Something more immediately obvious than 
@conditional_run (This is my subjective judgement: I am open to persuasion.)
Suggestions include
  @active_if 

 + 1 for active_if -- much better than my conditional_run
 
  @include_task_if
  @run_if

These both sound like they modify which targets you're selecting rather than what upstream functions will be run IMHO.

 

3) If we implement this, let us not export this keyword automatically. So you would
need to write either

from ruffus import conditional_run

this would be fine-- I never do a 'from ruffus import *' anyway as I like to know where those functions and decorators are coming from. Of course, my import statement is kinda ugly as a result:

from ruffus import (transform, follows, collate, files, split,
                    add_inputs, regex, suffix, mkdir, jobs_limit)
 
import ruffus
rf = ruffus

Which could also be 'import ruffus as rf'
 
This greatly reduces the cost of the changes.
What do you think?

Sounds great!

Leo Goodstadt

unread,
Feb 13, 2011, 1:00:08 PM2/13/11
to ruffus_...@googlegroups.com

>>   @active_if 
>
>
>  + 1 for active_if -- much better than my conditional_run

Agreed

>
>  
>>
>>
>> 3) If we implement this, let us not export this keyword automatically.
>

> this would be fine-- I never do a 'from ruffus import *' anyway as I like to know where those functions and decorators are coming from.

Agreed. That is what I do as well. But other people do
from ruffus import *
which is, I guess, my fault for the examples in the docs.

So I was talking nonsense. What I actually meant was having an *additional* nested name space within ruffus for less mainstream or more experimental decorators or syntax.
So it would have to be
from ruffus import XXX.active_if

(Is this the right python syntax?)

Suggestions for XXX are also welcome.

Names turn out to be the second most painful part of changing ruffus. Writing docs is a whole other level of torture...

I am on the road so this will have to await my return next week.

Leo

Bernie Pope

unread,
Feb 13, 2011, 6:26:42 PM2/13/11
to ruffus_...@googlegroups.com
On 13/02/2011, at 10:04 PM, Leo Goodstadt wrote:

> 2) We try to come up with a really good name. Something more immediately obvious than
> @conditional_run (This is my subjective judgement: I am open to persuasion.)
> Suggestions include
> @active_if
> @include_task_if
> @run_if

How about:

@when

(which was inspired by the monadic function in Haskell of the same name).

Cheers,
Bernie.

Leo Goodstadt

unread,
Feb 14, 2011, 8:46:47 AM2/14/11
to ruffus_...@googlegroups.com
@when would be among my first choices but are not such simple names a liability in terms of clashes with variable names?
I guess Haskell should also be familiar to most python programmers... if only!
Leo

Jacob Biesinger

unread,
Feb 14, 2011, 2:54:10 PM2/14/11
to ruffus_...@googlegroups.com
On Mon, Feb 14, 2011 at 5:46 AM, Leo Goodstadt <bunb...@gmail.com> wrote:
@when would be among my first choices but are not such simple names a liability in terms of clashes with variable names?

I agree generally, though it's not an issue in my scripts.   

Jacob Biesinger

unread,
Feb 14, 2011, 3:04:21 PM2/14/11
to ruffus_...@googlegroups.com

So I was talking nonsense. What I actually meant was having an *additional* nested name space within ruffus for less mainstream or more experimental decorators or syntax. 

So it would have to be
from ruffus import XXX.active_if

(Is this the right python syntax?)

I think the right syntax would be:

from ruffus.XXX import active_if 

Suggestions for XXX are also welcome.

ruffus.extras
ruffus.extra_decorators  # will there only be decorators in there?
ruffus.experimental  # implies may not work properly
ruffus.future  # implies these will become mainstream

Leo Goodstadt

unread,
Feb 15, 2011, 1:43:37 AM2/15/11
to ruffus_...@googlegroups.com
Thanks. This is unbelievably helpful!
Leo

Bernard Pope

unread,
Feb 15, 2011, 10:43:22 PM2/15/11
to ruffus_...@googlegroups.com
If there is a problem then qualified names will always fix it.

Cheers,
Bernie.

Leo Goodstadt

unread,
Feb 15, 2011, 10:56:28 PM2/15/11
to ruffus_...@googlegroups.com
All problems in python can be solved by another level in the namespace?
[ apologies to David Wheeler]
:)
Leo

Jake Biesinger

unread,
Feb 24, 2011, 8:01:15 PM2/24/11
to ruffus_...@googlegroups.com
Just wanted to say I think this is a *necessary* feature for the future of ruffus.  My pipeline grows and I'm currently kludging my way through conditional steps.


Leo Goodstadt

unread,
Feb 25, 2011, 8:08:21 AM2/25/11
to ruffus_...@googlegroups.com, Jake Biesinger
<jake.bi...@gmail.com> wrote:
Just wanted to say I think this is a *necessary* feature for the future of ruffus.  

I appreciate this and have started to work on this.

The first obvious issue is what happens to downstream tasks if you take out a step via @active_if.
Do they just behave as 
1) If the inactive task did not exist
2) If the inactive task was up to date and did not produce any output
3) They were inactive themselves until they joined up with an active branch.

(3) seems much too clever. There is enough cleverness already in ruffus :(
(2) is what I prefer for implementation reasons (see following) but I am not sure if it is a good enough solution.

I think this means that all tasks functions of an entire branch *may* have to be tagged with @active_if.

The implementation issues are this:

I would prefer it that Ruffus did not have to go in and tear down already created tasks and dependencies (remove it from the dependency tree of tasks). This is especially true as other tasks may have an inactive task in add_inputs. I don't want to have to trace down dangling inactive tasks...

The easiest solution I think (not sure if it will work with out coding it
up) is to treat an inactive task as an up-to-date task with no outputs.

This does mean that if you have the following scenario

        -> task2 -> task3-> task4
task1                                          -> task8
        -> task5 -> task6 -> task7

and task3 is inactive, then task2 will still run as well as task1-task8
task4 will have no inputs unless it has extra dependencies which are
not inactive.

I will try to code something for you to try out if you think this makes
sense...

For the moment, I will not add in a new colour scheme for inactive tasks in the pipeline_printout yet until we get this sorted first

Leo


 

Giovanni Marco Dall'Olio

unread,
Feb 25, 2011, 8:25:31 AM2/25/11
to ruffus_...@googlegroups.com, Leo Goodstadt, Jake Biesinger
I wonder whether it is correct to have 'if' statements in a pipeline.
In principle, a real pipeline (the one with the tubes and water in it) doesn't have "if" conditions. It is not that the water can choose whether to enter or not in pipe given some conditions, or that the pipeline changes its structure depending whether there is oil or water running in it.
With Makefiles, you usually tend to avoid conditions and loops because they make the code difficult to understand.
What I am saying is that you should keep your pipeline as simple as possible and leave the if conditions inside the function called. So, if you think that it is not appropriate to call GLITR every time, just write the condition within function that calls it and this will make your pipeline easier to understand. In principle, each run of the pipeline, with different data, should follow the same steps.
--
Giovanni Dall'Olio, phd student
Department of Biologia Evolutiva at CEXS-UPF (Barcelona, Spain)

My blog on bioinformatics: http://bioinfoblog.it

Leo Goodstadt

unread,
Feb 25, 2011, 11:37:33 AM2/25/11
to dallo...@gmail.com, ruffus_...@googlegroups.com, Jake Biesinger
Dear Giovanni,

I see you where you are coming from. However, many computational pipelines can
become irreducible complex. As usual, with programming, the complexity never really
goes away, and we are just pushing it from one place to another. 

One alternative might be to have two (water and oil?) pipelines with shared pipeline stages in a separate module common to both. There is some but probably not enough support for this approach in Ruffus either. (It would help if we had an easier way to pass options from the main programme to the Ruffus decorators, for example.)

The less palatable alternative would be to have two scripts with substantially shared code with copying and pasting in between. This would be a maintenance nightmare.

Given all this, I am prepared to at least see if @active_if helps.
Like all ruffus decorators, its use is optional, and most of the extra burden is on the implementer rather than the user (well the documentation also becomes more complicated...)

Leo

Ryan dale

unread,
Feb 25, 2011, 11:54:01 AM2/25/11
to ruffus_...@googlegroups.com
One of the reasons I like ruffus is because it can do things relatively easily that you can't do in Makefiles!  Personally I would like the option that Leo's talking about, but you're right that it may not be for everyone.

As a concrete example, I've attached what I'd consider to be typical pipeline tasks.  I've connected them in different ways (each path colored differently) that would be nice to enable/disable based on a config file as has been discussed.

This actually shows two different kinds of steps.  The PCR-bias filter step is optional (e.g. via the @active_if decorator). So that's an on/off state.  But the peak calling is an either/or choice.

How about an @optional_after and @alternative_after, in addition to @active_if,  to better specify what should happen downstream?

Mock-up code below.  Basically the PCR-bias filtering step is marked as optional, while the peak calling steps are marked as alternatives.  The @transform decorators accept alternatives() calls to indicate which tasks they should look upstream for to get their inputs from.

Just tossing out ideas, who knows if what I've written below would actually work.  But definitely +1 from me for adding some sort of pipeline-changing functionality, in whatever form it ends up taking.


def mapping(infile,outfile):
    # make bam file
    pass

@optional_after(mapping)
@active_if(config.pcr)
@transform( mapping, "*.bam", "*.bias-removed.bam")
def pcr_bias_filter(infile,outfile):
    # make filtered bam file
    pass

@tranform( alternatives(pcr_bias, mapping), suffix("*.bam"), "*.bed")
@alternative_after(pcr_bias, mapping)
@active_if(config.macs)
def run_macs(infile,outfile):
    # make bed file
    pass

@tranform( alternatives(pcr_bias, mapping), suffix("*.bam"), "*.bed")
@alternative_after(pcr_bias,mapping)
@active_fi(config.spp)
def run_spp(infile,outfile):
    # make bed file
    pass

@transform( alternatives(run_macs, run_spp), suffix("*.bed"), ".annotation")
def annotation(infile,outfile):
    pass





-ryan
-- 
Ryan Dale, PhD
Bioinformatics Scientist, NIH/NIDDK
Contractor, Kelly Government Solutions
hypothetical.png

Leo Goodstadt

unread,
Feb 25, 2011, 2:11:36 PM2/25/11
to ruffus_...@googlegroups.com, Ryan dale
Dear Ryan,

I am glad more people are piling into this.
We need more diverse experience.

First thing:
Ruffus needs a serious reorganisation to make it easier for newcomers. 

So I am extremely, extremely loathe to make things more complicated rather than
simpler. 

Given that, manipulating the pipeline configuration dynamically seems to be a really
definite request and I am geared up to do something about it, if we can arrive at a
good design together.

I think that the extra decorators you suggest would just make everything too complicated.
I don't mean only the actual names, but the entire scheme.

On the other hand, your hypothetical pipeline really cannot be put together with lots
of @active_if (or the equivalent), simply because there are short circuiting alternative paths
(i.e. when remove PCR bias is inactive).
Previous to your example, I had thought that we could just have @active_if around all the
 alternative paths, much like switch taps on and off at choke points in a water pipeline.


How can one dynamically / conditionally choose how tasks are connected to each other?

One current way is:

use_task_1 = True
use_task_2 = False

@transform( 
((task1 if use_task_1 else None),
  task2 if use_task_2 else None)), suffix(".bam"), ".alt.bam")
def real_bammer(infile,outfile):
    pass

If use_task_1 is false, instead of adding inputs from task1, we add
a job with a None input. This doesn't match ".bam" and will be ignored.

If you think that adding (eventually ignored) jobs with an input of None is
a kludge. we can add an IGNORE_TASK constant instead of None,
 as a special flag which will not create any jobs at all:

@transform( 
((task1 if use_task_1 else IGNORE_TASK),
  task2 if use_task_2 else IGNORE_TASK)), suffix(".bam"), ".alt.bam")
def real_bammer(infile,outfile):
    pass

This syntax is more python-y rather than ruffus-y in approach.

The alternative would be have some sort of conditional wrapper around each task,
sort of along the lines your suggest, and analogous to @active_if to indicate whether
the task should be a real dependence.

Here is some possible syntax:

The new keyword would be 
    get_input_if
(This is really awful name: suggestions welcome).


@transform( 
(get_input_if(task1, use_task_1),
 get_input_if(task2, use_task_2)), suffix(.bam), ".alt.bam")
def real_bammer(infile,outfile):
    pass

Since this doesn't look appreciably simpler, I am more persuaded by option 1 which 
introduces less new syntax.

Would this be good enough for you?

What do you think?

Leo

Giovanni Marco Dall'Olio

unread,
Feb 25, 2011, 3:00:02 PM2/25/11
to ruffus_...@googlegroups.com, Ryan dale
In order to avoid conditions, you just have to put the decisions inside a function. For example:

[jobs_that_need_macs, other_jobs] = determine_which_jobs_need_macs()
make_macs(jobs_that_need_macs)
make_other_jobs(other_jobs)
resume_results()

For example, on your pipeline:

# mapping
all_bams = mapping()

# applying PCR filters
[PCR_biased_bams, other_bams] = determine_which_bams_are_biased(all_bams)
filtered_bams = biasfilter(PCR_biased_bams)
all_bams_including_filtered = merge(other_bams, filtered_bams)

# determine call method. Notice how this can now be moved a step after bias filter
[bams_to_be_analyzed_with_MACS, bams_to_be_analyzed_with_spp] = determine_calling_method (all_bams_including filter)
macs_beds = call_peaks_with_MACS(bams_to_be_analyzed_with_MACS)
spp_beds = call_peaks_with_spp(bams_to_be_analyzed_with_spp)
all_beds = merge(macs_beds, spp_beds)

# final point
entry_point_downstream_annotation(all_beds)


This is not very different from what you have posted, but here the decisions are made inside a function. To me, this looks easier to read; moreover, it's easier to parallelize.

Hope it will be useful for you.

Ryan dale

unread,
Feb 25, 2011, 3:02:21 PM2/25/11
to Leo Goodstadt, ruffus_...@googlegroups.com

So I am extremely, extremely loathe to make things more complicated rather than
simpler. 

I hear you :)

I like the "if use_task_1 else None" idea with @transform.  Assuming the addition of @active_if,  I think the code below would run the example pipeline in my previous message (I could be wrong though) :


apply_filter = True
use_macs = True
use_spp = False

def mapper(infile,outfile):
    pass
   
@transform(mapper if apply_filter else None, suffix('.bam'), '.filtered.bam')
def pcr_filter(infile,outfile):
    pass

@active_if(use_macs)
@transform(pcr_filter if apply_filter else mapper,
suffix('.bam'), '.macs.peaks.bed')
def run_macs(infile,outfile):
    pass

@active_if(use_spp)
@transform(pcr_filter if apply_filter else mapper,
suffix('.bam'), '.spp.peaks.bed')
def run_spp(infile,outfile):
    pass

@transform(run_spp if use_spp else run_macs)   
def downstream(infile, outfile):
    pass


One limitation to this is that the downstream() task forces a binary choice between peak callers. 

One way around this could be to provide a function that returns an upstream task to use as the "parent" to downstream().

def parent_tasks_of_downstream():
    if use_spp:
        return run_spp
    if use_macs:
        return run_macs
    if use_pics:
        return run_pics

But this function would awkwardly have to be declared right before downstream() so as to have the run_spp, run_macs, and run_pics tasks already defined.

Perhaps another option would be to allow string task names, like @follows does.  Then the trick is to decide what should be considered a task name and what should be a filename.

-ryan

Jacob Biesinger

unread,
Feb 25, 2011, 8:30:44 PM2/25/11
to ruffus_...@googlegroups.com, Ryan dale, Leo Goodstadt
Good inputs everyone!

I'm coming from an SGE_ARRAY oriented task flow, where I'd be forking off the full join of many parameter values (2 values for param1, 3 values for param2 = 6 jobs to run).  My workflows normally expose all the options and let the user decide which combinations to run (bowtie and mosaik, macs and glitr = 4 final datasets, 1 for each mapper and one for each peak caller).  I also let the "structure" of the pipeline change according to the steps requested by the user.  In my pipeline now, I achieve conditional runs by changing the regex.  Leo, your #2 option has the exact same effect, but an @active_if would be so much prettier than my  'NO MATCH' regex.  For example, I currently use:

clip_suffix = '.fastq' if cfg.getboolean('filtering', 'clip_adapter') else 'NO MATCH'
@transform('*.fastq', suffix(clip_suffix), '.noAdapter')
def clip_adapter(in_fastq, out_fastq):
    pass

to make clip_adapter run conditionally. This would be rewritten as:

@active_if(cfg.getboolean('filtering', 'clip_adapter'))
@transform('*.fastq', suffix('.fastq'), '.noAdapter')
def clip_adapter(in_fastq, out_fastq):
    pass

The issue with my current implementation (in addition to being much more unsightly IMHO) is that it can't be applied to jobs which don't filter their input (tasks with only @follows or @files).

I think having conditionals in the pipeline and changing the structure of the pipeline on the fly is just what the doctor ordered-- I can create a pipeline for a user on the fly and Ruffus already gives me most of the flexibility for doing this.

Here's an idea of what my pipeline is structured like:

# filtering. Each of these steps is optional

prev_filter = '*.fastq'  # name of the last filtering step-- stays 'fastq' if no filters, otherwise takes on filter function name

clip_suffix = '' if cfg.getboolean('filtering', 'clip_adapter') else 'NO MATCH'
@transform(prev_filter, suffix(clip_suffix), '.noAdapter')
def clip_adapter(in_fastq, out_fastq):
    pass
if cfg.getboolean('filtering', 'clip_adapter'):
    prev_filter = clip_adapter

filter_art_suffix = '' if cfg.getboolean('filtering', 'filter_artifacts') else 'NO MATCH'
@transform(prev_filter, suffix(filter_art_suffix), '.noArtifacts')
def filter_artifacts(in_fastq, out_fastq):
    pass
if cfg.getboolean('filtering', 'filter_artifacts'):
    prev_filter = filter_artifacts

filter_qual_suffix = '' if cfg.getboolean('filtering', 'filter_quality') else 'NO MATCH'
@transform(prev_filter, suffix(filter_qual_suffix), '.min_qual')
def filter_min_quality(in_fastq, out_fastq):
    pass
if cfg.getboolean('filtering', 'filter_quality'):
    prev_filter = filter_min_quality


# mapping
# Use the final output (prev_filter) as input to each of these functions

pash_suffix = '' if cfg.getboolean('mapping', 'run_pash') else 'NO MATCH'
@transform(prev_filter, suffix(pash_suffix), '.pash_reads')
def run_pash(in_fastq, out_pash):
    pass

bowtie_suffix = '' if cfg.getboolean('mapping', 'run_bowtie') else 'NO MATCH'
@transform(prev_filter, suffix(bowtie_suffix), '.bowtie_reads')
def run_bowtie(in_fastq, out_bowtie):
    pass

mosaik_suffix = '' if cfg.getboolean('mapping', 'run_mosaik') else 'NO MATCH'
@transform(prev_filter, suffix(mosaik_suffix), '.mosaik_reads_dat')
def run_mosaik(in_fastq, out_dat):
    pass

# each of the mapper's output can optionally be downsampled a user-specified number of times
uniquefy_downsample_regex = r'(.+)\.treat(.*)\.mapped_reads' if cfg.getboolean('peaks', 'downsample_reads') else 'NO MATCH'
@split([run_bowtie, run_pash, run_mosaik], regex(uniquefy_downsample_regex),
       add_inputs(r'\1.control\2.mapped_reads'),
       [r'\1.treat\2.matched_size_*.mapped_reads', r'\1.control\2.matched_size_*.mapped_reads'])
def uniquefy_downsample_reads(in_files, out_files):
    pass

# feed the output from active mappers into the peak callers
all_mappers_output = [bowtie_to_bed, pash_to_bed, mosaik_to_bed, uniquefy_downsample_reads]

# peak calling
macs_regex = r'(.+)\.treat(.*)\.mapped_reads' if cfg.getboolean('peaks', 'run_macs') else 'NO MATCH'
@collate(all_mappers_output, regex(macs_regex), 
         add_inputs(r'\1.control\2.mapped_reads'), r'\1.treat\2.macs.peaks')
def run_macs(in_files, out_peaks):
    pass

arem_regex = r'(.+)\.treat(.*)\.mapped_reads' if cfg.getboolean('peaks', 'run_arem') else 'NO MATCH'
@collate(all_mappers_output, regex(arem_regex), 
         add_inputs(r'\1.control\2.mapped_reads'), r'\1.treat\2.arem.peaks')
def run_arem(in_files, out_peaks):
    pass


glitr_regex = r'(.+)\.treat\.mapped_reads' if cfg.getboolean('peaks', 'run_glitr') else 'NO MATCH'
@collate(bed_to_glitr, regex(glitr_regex), 
         add_inputs(r'\1.control.mapped_reads'), r'\1.treat.glitr.peaks')
def run_glitr(in_files, out_peaks):
    pass


# Now finally the active peak callers are each channeled into downstream steps
# push all peaks from all peak callers and all raw reads from all mappers up to UCSC
@transform(all_peak_caller_functions + [run_bowtie, run_pash, run_mosaik], suffix(''), '.bigbed')
def deploy_tracks(in_bed, out_bigbed):
    pass

# find nearby genes
@collate(all_peak_caller_functions, regex(r'(.+)\.treat\.(.+)\.peaks'), 
         add_inputs(refseq_genes_to_bed), r'\1.treat.\2.peaks.nearby_genes')
def find_nearby_genes(in_files, out_genes):
    pass

# etc


I like the "if use_task_1 else None" idea with @transform.  Assuming the addition of @active_if,  I think the code below would run the example pipeline in my previous message (I could be wrong though) :

You could do this or you could build up a list of functions to be passed as input to the next step.
 

Perhaps another option would be to allow string task names, like @follows does.  Then the trick is to decide what should be considered a task name and what should be a filename.

I think this functionality is already present in ruffus, though I haven't tried it out.  Check out the output_from indicator object from http://www.ruffus.org.uk/decorators/indicator_objects.html

@transform( 
((task1 if use_task_1 else  IGNORE_TASK),
  task2 if use_task_2 else  IGNORE_TASK)), suffix(".bam"), ".alt.bam")
def real_bammer(infile,outfile):
    pass

This syntax is more python-y rather than ruffus-y in approach.

The alternative would be have some sort of conditional wrapper around each task,
sort of along the lines your suggest, and analogous to @active_if to indicate whether
the task should be a real dependence.

Here is some possible syntax:

The new keyword would be 
    get_input_if
(This is really awful name: suggestions welcome).

I'm personally okay with using python to figure out what the inputs are (I can build up a list as I go along defining functions, and I can keep track of the last-used-function in a chain as a separate variable.
I wonder whether it is correct to have 'if' statements in a pipeline.
In principle, a real pipeline (the one with the tubes and water in it) doesn't have "if" conditions. It is not that the water can choose whether to enter or not in pipe given some conditions, or that the pipeline changes its structure depending whether there is oil or water running in it.
With Makefiles, you usually tend to avoid conditions and loops because they make the code difficult to understand.
Makefiles are hard to understand anyway! 
What I am saying is that you should keep your pipeline as simple as possible and leave the if conditions inside the function called. So, if you think that it is not appropriate to call GLITR every time, just write the condition within function that calls it and this will make your pipeline easier to understand. In principle, each run of the pipeline, with different data, should follow the same steps.
I'm not sure how I could implement this in practice.  GLITR is part of a chain and transforms data.  You mean within the run_glitr function I should check to see if I should actually run GLITR?  If I don't, then any steps that depend on GLITR will fail since their input file will be missing (or empty if I just touch it within run_glitr).

2) If the inactive task was up to date and did not produce any output
Exactly what my regex munging does.  +1 for this :)
I think this means that all tasks functions of an entire branch *may* have to be tagged with @active_if.
Why would this be necessary? 

This does mean that if you have the following scenario

        -> task2 -> task3-> task4
task1                                          -> task8
        -> task5 -> task6 -> task7

and task3 is inactive, then task2 will still run as well as task1-task8
task4 will have no inputs unless it has extra dependencies which are
not inactive.
Perfect. 

Anyway, just my thoughts + experience.  I'm relatively new to ruffus but I'm enjoying it quite a bit. I recoded my previous Galaxy workflow in ruffus in a day or two, then started having some real fun with variable # of outputs, conditional steps, and pipeline restructuring.  Now I have something of an uber-pipeline that works on all my data-- chipseq, CLIP-seq, and soon RNA-seq, all with just a few tweaks to a config file.

Thanks for the package!

Jacob Biesinger

unread,
Feb 25, 2011, 8:49:26 PM2/25/11
to ruffus_...@googlegroups.com, Giovanni Marco Dall'Olio, Ryan dale
On Fri, Feb 25, 2011 at 12:00 PM, Giovanni Marco Dall'Olio <dallo...@gmail.com> wrote:
In order to avoid conditions, you just have to put the decisions inside a function. For example:

[jobs_that_need_macs, other_jobs] = determine_which_jobs_need_macs()
make_macs(jobs_that_need_macs)
make_other_jobs(other_jobs)
resume_results()

what exactly does make_macs() do?  Is that defining the run_macs function and attaching it to the pipeline with only jobs_that_need_macs as input? Something like...

def mapping_jobs():
    pass

def make_macs(jobs):
    @follows(mapping_jobs)
    def run_macs(jobs):
        # do MACS stuff
        pass
    return run_macs
macs = make_macs(jobs_that_need_macs)

@follows(macs)
def downstream():
    pass

This seems less readable.  Am I missing something?

Giovanni Marco Dall'Olio

unread,
Feb 26, 2011, 4:08:37 AM2/26/11
to Jacob Biesinger, ruffus_...@googlegroups.com, Ryan dale
no... it is just an example, as I not very familiar with your pipeline. You can subsitute it with run_macs:

[jobs_that_need_macs, other_jobs] = determine_which_jobs_need_macs()
run_macs(jobs_that_need_macs)
run_other_jobs(other_jobs)
resume_results()

Look at the complete example in my previous mail, that should be clearer.


Jacob Biesinger

unread,
Feb 26, 2011, 10:24:08 AM2/26/11
to dallo...@gmail.com, ruffus_...@googlegroups.com, Ryan dale
I'm still not sure how it would actually fit into ruffus.  I appreciate the readability but I guess what's confusing me is that in ruffus you don't normally call your functions directly.  So the lines

run_macs(jobs_that_need_macs)
run_other_jobs(other_jobs)
resume_results()

would be preceded by 'def' and ruffus decorators.  Is this what you mean:

# maybe in a separate file with other utility functions
def determine_which_jobs_need_macs():
    return '*.mapped_reads' if cfg.run_macs else None 
    # OR more complicated, checking which mappers are to be used as input
    return [output_from(mapper) for mapper in all_mappers if cfg.get_boolean('mapping', 'use_%s' % mapper)]

# in the pipeline file, where the structure is defined
macs_jobs = determine_which_jobs_need_macs()
@transform(macs_jobs, ...)
def run_macs(in_bed,out_peaks):
    pass

The divorce of pipeline structure and input logic makes sense and would be more readable, but like Leo pointed out, you're just pushing the complexity into a different area of the code.

Leo Goodstadt

unread,
Feb 26, 2011, 10:36:01 AM2/26/11
to ruffus_...@googlegroups.com, Jacob Biesinger, dallo...@gmail.com, Ryan dale
I too don't understand how this would work in terms of Ruffus.
But it is very good that more members of the Ruffus community are explaining 
how they are using or would like to use the library.

Leo

Dale, Ryan (NIH/NIDDK) [C]

unread,
Feb 27, 2011, 12:38:52 PM2/27/11
to Jacob Biesinger, ruffus_...@googlegroups.com, Leo Goodstadt
That's a really good idea to encode the use of filters [or not] within the extensions. Thanks for sharing. Also, I didn't know about the output_from indicator -- that should be useful.


________________________________________
From: Jacob Biesinger [jake.bi...@gmail.com]
Sent: Friday, February 25, 2011 8:30 PM
To: ruffus_...@googlegroups.com
Cc: Dale, Ryan (NIH/NIDDK) [C]; Leo Goodstadt
Subject: Re: Easy way to turn *off* certain jobs?

Leo Goodstadt

unread,
Feb 27, 2011, 12:46:24 PM2/27/11
to ruffus_...@googlegroups.com, Dale, Ryan (NIH/NIDDK) [C], Jacob Biesinger
I had forgotten how to do output_from as well.
(I remember putting in the functionality but not how to call it.)
Duh!
Thanks.
Leo

Jacob Biesinger

unread,
Apr 26, 2011, 3:44:23 PM4/26/11
to Leo Goodstadt, ruffus_...@googlegroups.com
Hi Leo,

Any progress with this particular feature?  I can spend a few clicks on it, having just wrapped up another project.

Like I said before, I can hack my way around for transforms, but there are several operations that I can't make conditional (like @files).

Thanks!
--
Jake Biesinger
Graduate Student
Xie Lab, UC Irvine

Jacob Biesinger

unread,
Apr 27, 2011, 1:17:24 AM4/27/11
to Leo Goodstadt, ruffus_...@googlegroups.com
I spent some time on an active_if decorator which looks like it's working right.  I'm not sure that I'm catching all the edge cases (there are a lot of them!) but in my simple testing, it works fine.

See attached for a diff on task.py. I'm also including the tests with which I did an ad-hoc run through and a utils file that pushes some of the complexity of your template out of the pipeline code.

Could you comment on the appropriateness of the changes?

--
Jake Biesinger
Graduate Student
Xie Lab, UC Irvine



task.py.active_if.diff
ruffus_test.py
ruffus_utils.py

Jacob Biesinger

unread,
Apr 27, 2011, 1:37:22 AM4/27/11
to Leo Goodstadt, ruffus_...@googlegroups.com
This version allowing callables as arguments to active_if, a la

@active_if(lambda: False, lambda: True, True)  # won't run-- requires all(*args)
@merge([myfunc3, myfunc4, myfunc2], 'test2.merged')
def myfunc7(in_files, out_pattern):
    print 'function 7', in_files
    open('test2.merged', 'w')

--
Jake Biesinger
Graduate Student
Xie Lab, UC Irvine



task.py.active_if_with_callables.diff

Bernie Pope

unread,
Sep 30, 2011, 2:40:11 AM9/30/11
to ruffus_...@googlegroups.com
Hi all,

Sorry to dig up an old thread. I find myself in the position of needing conditionals in my pipeline and I can't see an easy workaround.

I wonder if this feature has been implemented/included?

Cheers,
Bernie.

> <task.py.active_if.diff><ruffus_test.py><ruffus_utils.py>

Jacob Biesinger

unread,
Sep 30, 2011, 2:55:41 AM9/30/11
to ruffus_...@googlegroups.com
Yep, but not in the main trunk.  I implemented the @active_if decorator in https://github.com/jakebiesinger/ruffus 

It takes a boolean, a list of booleans, or a function that returns one of those, and doesn't run if any of the booleans are False.

in code, this looks like:

self.is_active_job = all(
                        arg() if isinstance(arg, collections.Callable) else arg
                        for arg in active_checks)

The decorator works great, although inactive jobs essentially just have their input replaced with an empty list (and therefore have no outputs either).  At some point, I'll add an inactive status message in the pipeline_print/more verbose logs.


Another way of doing the conditionals is shown in Ryan Dale's code here:

--
Jake Biesinger
Graduate Student
Xie Lab, UC Irvine



Bernie Pope

unread,
Sep 30, 2011, 3:27:56 AM9/30/11
to ruffus_...@googlegroups.com
Thanks Jake,

On 30/09/2011, at 4:55 PM, Jacob Biesinger wrote:

> It takes a boolean, a list of booleans, or a function that returns one of those, and doesn't run if any of the booleans are False.

That looks quite cool.

> Another way of doing the conditionals is shown in Ryan Dale's code here:
> https://github.com/daler/pipeline-example/blob/master/pipeline-3/pipeline.py

Oh great, that looks like it will do what I want.

Cheers,
Bernie.

Leo Goodstadt

unread,
Sep 30, 2011, 8:43:46 PM9/30/11
to ruffus_...@googlegroups.com
Hi Jake,

On 30 September 2011 07:55, Jacob Biesinger <jake.bi...@gmail.com> wrote:
I implemented the @active_if decorator in https://github.com/jakebiesinger/ruffus 
 
Your @active_if code looks great. Would you mind contributing it to the main Ruffus code?
And if so, would you like to be listed as a contributor / co-author? 

I would need to import your changes to the main ruffus repository with your permission and 
run some unit tests.


I must say I am really grateful and impressed by your efforts because you have clearly 
gone through the source of Ruffus and understood how it works. (I really needed to put 
more comments at the design / implementation level so that people can follow what is going on.)
I also now wish I had gone the mercurial route rather than svn route from the beginning as
this sort of collaboration would be much easier. I may yet switch.

My one doubt is over your comment:
1954 # I thought overwriting the param_generator_func et al would lead to
1955 # trouble with the order of decorator nesting, but it seems to work fine

Certainly once set, "self.needs_update_func" is not over-ridden, precisely so that there are no order of decorator issues
with @check_if_uptodate.
This "feature" should really be documented in the code...

I am not so sure about param_generator_func. I suspect that this is not right but that you are masking this by the code changes in
get_output_files() and signal(). At some point, this hidden fragility will bite us.

I might also have a quick think to see if @active_if can check its boolean parameters only when pipeline_printout() / pipeline_run()
is invoked rather than at the point of the decorator definition. I doubt if that is going to affect many people, but it would be nice
to postpone the check if possible.


Leo

Another way of doing the conditionals is shown in Ryan Dale's code here:
 
P.S. I had never done this because of my unnecessary nervousness over the draconian strictures about using 
top level functions in multiprocessing (pickling)...

 

Leo Goodstadt

unread,
Oct 3, 2011, 9:42:43 AM10/3/11
to ruffus_...@googlegroups.com
On 1 October 2011 01:43, Leo Goodstadt <leo.go...@llew.org.uk> wrote:
On 30 September 2011 07:55, Jacob Biesinger <jake.bi...@gmail.com> wrote:
I implemented the @active_if decorator in https://github.com/jakebiesinger/ruffus 
Your @active_if code looks great. Would you mind contributing it to the main Ruffus code? 
Would you like to be listed as a contributor / co-author? 


Dear Jake,

I have implemented @active_if in the Ruffus svn. Please
test this out if you have the time.

This code is more involved than your version because I allow the active state to be changed
right up to the invocation of pipeline_run (rather than in the decorator).

Thus in my test code I have

pipeline_active_if = None
#
#    task with active state switching
#
@active_if(lambda:pipeline_active_if)
@transform(...)
def test_task(...):
    pass
#    activate task
pipeline_active_if = True
pipeline_run([test_task], ...)
#    inactivate task
pipeline_active_if = False
pipeline_run([test_task], ...)

See test/test_active_if.py

This seems to work with no decorator order dependency.

I don't touch the following functions (taking a slight different
approach from your code) 
        self.param_generator_func
        self.needs_update_func
because this did not allow the deferring of the activation decision 
until pipeline_run as I wanted.

However, this required adding special handling of active_if in
each of the following functions so the code is more not less involved :(
make_job_parameter_generator(...)
printout(...)
signal(...)

Thanks

Leo


Leo Goodstadt

unread,
Oct 3, 2011, 12:28:13 PM10/3/11
to ruffus_...@googlegroups.com
P.S. Sorry it turn 6 months to implement this!
But thanks to everyone's input on the list.
Leo
Message has been deleted

Eleftherios Avramidis

unread,
Dec 16, 2011, 12:17:19 PM12/16/11
to ruffus_...@googlegroups.com
I am trying to see the practicality of this feature. Let's say it is task3 which I want to deactivate. This task would @transform some data from task2 to task4. When this is NOT activated, task4 will get an empty input list. Wouldn't it be more convenient task3 to pass its input to the output directly, when de-activated?

Leo Goodstadt

unread,
Dec 23, 2011, 8:26:03 AM12/23/11
to ruffus_discuss
How about having another task (anti_task3?) which just forwards its
inputs, and whose
activation / deactivation logic is the OPPOSITE of task3?
You can think of task3 and anti_task3 as switches in a pipelines so
that the water
(your data) flows one way or another.
This seems to be slightly less magical but clearer. I am trying to
steer clear of
magic in Ruffus now!
Leo

On Dec 16, 5:17 pm, Eleftherios Avramidis

Eleftherios Avramidis

unread,
Jan 18, 2012, 1:46:38 PM1/18/12
to ruffus_...@googlegroups.com
I see your point.

Though there is nothing magical in case one job should be skipped. It is a rather common experimental pipeline case to turn off one job, and then get the requirements of the dependant (next) job directly from the input of the skipped job.

If this "input passing" is not supported, then when I turn off one job, I have to either change the dependencies of the next job, or write an anti-job to copy/link the files. Which is not much of an offer from @active_if, because I could not use this decorator at all, and write an if(disable_check): shutil.link(input, output) in the job; which by the way is what I have ended up doing for all of my jobs that may need turning off.

Eleftherios Avramidis

unread,
Jan 27, 2012, 10:34:07 AM1/27/12
to ruffus_...@googlegroups.com
In a command which is merging/collating output from several tasks, how can include output from the previous steps which have not been inactive?

I.e. the following gives an error if features_lm_target has been deactivated

@active_if(cfg.exists_lm(target_language))
@transform(data_fetch, suffix(".orig.jcml"), ".lm.%s.f.jcml" % target_language, target_language, cfg.get_lm_name(target_language))
def features_lm_target(input_file, output_file, language, lm_name):
    #code goes here
   
@collate([data_fetch features_lm_target], regex(r"([^.]+)\.(.+)\.f.jcml"),  r"\1.all.f.jcml")
def features_gather(singledataset_annotations, gathered_singledataset_annotations):
    #code goes here

I would need a way to check that the items in the @collate input are active, before importing them. Any suggestion?

Jacob Biesinger

unread,
Jan 27, 2012, 11:38:56 AM1/27/12
to ruffus_...@googlegroups.com
On Fri, Jan 27, 2012 at 7:34 AM, Eleftherios Avramidis <eleftherio...@googlemail.com> wrote:
In a command which is merging/collating output from several tasks, how can include output from the previous steps which have not been inactive?

I.e. the following gives an error if features_lm_target has been deactivated

@active_if(cfg.exists_lm(target_language))
@transform(data_fetch, suffix(".orig.jcml"), ".lm.%s.f.jcml" % target_language, target_language, cfg.get_lm_name(target_language))
def features_lm_target(input_file, output_file, language, lm_name):
    #code goes here
   

Is something like this what you mean? 

@collate([data_fetch] + [features_lm_target] if cfg.exists_lm(target_language) else [], regex(r"([^.]+)\.(.+)\.f.jcml"),  r"\1.all.f.jcml")

Eleftherios Avramidis

unread,
Feb 9, 2012, 11:15:14 AM2/9/12
to ruffus_...@googlegroups.com
Nice one, I hadn't thought of this lambda-looking possibility
Reply all
Reply to author
Forward
0 new messages