Multiple tasks depend on same output from a single task?

27 views
Skip to first unread message

Keith Hughitt

unread,
Apr 11, 2016, 8:43:30 AM4/11/16
to ruffus_discuss
Hello,

Does anyone know how I can create a pipeline where several different tasks depend on the same output from a 1:1 task?

For example:

    task0:   in -> out

    taska:   out > outa
    taskb:   out > outb
    taskc:   out > outc

Since the required input is the same for tasks a,b, and c, "task0" only needs to be called once.

The @subdivide decorator seems to be geared towards this kind of scenario, but most of the examples I could (e.g. http://www.ruffus.org.uk/decorators/subdivide.html) pass the output from "task0" to a single function "taskx" instead of multiple functions ("taska", "taskb", "taskc").

When I try to configure the pipeline using subdivide and generate a flowchart, ruffus seems to only follow one of the possible paths ("taska").

Here is a skeleton version of what I have at the moment:

@subdivide(inputs, input_regex,
           ['output.taska'
            'output.taskb',
            'output.taskc'])
def task0(...)

@transform(task0, 'output.taska', ...)
def taska(input_file, output_file, ...):

@transform(task0, 'output.taskb', ...)
def taskb(input_file, output_file, ...):

etc.

Eventually there is a @merge function which recombined all the split outputs, and is used as the target for pipeline_run.

Any suggestions?

Thanks!
Keith

Leo Goodstadt 顧維斌

unread,
Apr 12, 2016, 2:25:56 AM4/12/16
to ruffus_...@googlegroups.com
Hi Keith,
If I haven't misunderstood you, you don't need to do anything for this to work.
taska, taskb and taskc can all depend on task0, and this will work by magic.

Here is some untested code:

@originate(["test1.a", "test2.a"])
def task0(output_file):
   with open(output_file, "w") as o:
      pass

@transform(task0, suffix(".a"), ".b)
def taska(input_file, output_file):
   with open(output_file, "w") as o:
      pass

@transform(task0, suffix(".a"), ".b)
def taskb(input_file, output_file):
   with open(output_file, "w") as o:
      pass

What would help is if you wrote down the files that each of the tasks generates.



--
You received this message because you are subscribed to the Google Groups "ruffus_discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ruffus_discus...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Keith Hughitt

unread,
Apr 12, 2016, 12:23:01 PM4/12/16
to ruffus_discuss, llewgo...@gmail.com

Hi Leo,

Thanks for the suggestion! 

To make things more clear, let me put some more realistic names of the functions and describe their corresponding inputs and outputs.

Input files

"args.input_reads" is a list of sample filepaths to be processed, e.g.:

/path/to/sample01/sample01_R1.fastq
/path/to/sample01/sample01_R2.fastq
/path/to/sample01/sample02_R1.fastq
/path/to/sample01/sample02_R2.fastq
etc.

Some other variables (e.g. "build_dirs") are global variables created before Ruffus is called. 

Tasks:

# pre-req (before task 0 above)
def check_for_bowtie_indices
    ... 

#
# "task 0a"
#
# OUTPUTS:
#
#     common/sample01/ruffus/sample01_R1.filter_nontarget_reads
#     common/sample01/ruffus/sample01_R2.filter_nontarget_reads
#     ...
#
@follows(check_for_bowtie_indices)
@transform(args.input_reads,
           regex(r'^(.*/)?([^_]*)_(R?[1-2])_?(.*)?\.fastq(\.gz)?'),
           r'%s/\2/ruffus/\2_\3.filter_nontarget_reads' % build_dirs['shared'],
           r'\2', r'\3')
def filter_nontarget_reads(input_file, output_file, sample_id, read_num):
    ...

#
# "task 0b"
#
# OUTPUTS:
#
#     common/sample01/ruffus/sample01_R1.filter_genomic_reads
#     common/sample01/ruffus/sample01_R2.filter_genomic_reads
#     ...
#
# Was previously using @transform here, but testing out @subdivide to run following steps in parallel...
#
@follows(filter_nontarget_reads)
@subdivide(args.input_reads,
           regex(r'^(.*/)?([^_]*)_(R?[1-2])_?(.*)?\.fastq(\.gz)?'),
           [r'%s/\2/ruffus/\2_\3.filter_genomic_reads.sl' %
               build_dirs['shared'],
            r'%s/\2/ruffus/\2_\3.filter_genomic_reads.rsl' %
            build_dirs['shared'],
            r'%s/\2/ruffus/\2_\3.filter_genomic_reads.polya' %
            build_dirs['shared'],
            r'%s/\2/ruffus/\2_\3.filter_genomic_reads.polyt' %
            build_dirs['shared']],
            r'%s/\2/ruffus/\2_\3.filter_genomic_reads' % build_dirs['shared'],
            r'\2', r'\3')
def filter_genomic_reads(input_file, output_files, output_base, sample_id, read_num):
    ...

#
# "task a"
#
# OUTPUTS:
#
#    spliced_leader/param-info-here/sample01/ruffus/sample01_R1.find_sl_reads
#    spliced_leader/param-info-here/sample01/ruffus/sample01_R2.find_sl_reads
#     ...
#
@transform(filter_genomic_reads,
           regex(r'^(.*)/(.*)_(R?[12]).filter_genomic_reads.sl'),
           r'%s/\2/ruffus/\2_\3.find_sl_reads' % build_dirs['sl'],
           r'\2', r'\3')
def find_sl_reads(input_file, output_file, sample_id, read_num):
    ...

Tasks "b-c" are similar to "find_sl_reads" above, but each looks for a different feature and creates files using different suffixes.

Notes

I left out a couple steps in the full pipeline, but it should be enough to illustrate the basic problem.

In case it helps, here is the actual code for the pipeline I'm working on: https://github.com/elsayed-lab/utr_analysis/blob/master/utr_analysis.py

Some of the variables used in tasks are encoded in filenames (e.g. sample ID / read num), which is probably not the best way to pass that information along, but at the time it was the best I could come up with. 

I also ended up using dummy placeholder files created once a task is completed, rather than the actual files generated. This again has more to do with me not being able to setup the tasks to manage the file checks directly.

If you have any suggestions for how to handle either of these issues more elegantly, it would probably go a long ways to simplifying the code.

Cheers,
Keith

Keith Hughitt

unread,
Apr 13, 2016, 11:02:44 AM4/13/16
to ruffus_discuss, llewgo...@gmail.com
After giving this some more thought, I think I'm going to try and refactor my pipeline to make better use of the decorators and avoid using dummy files to track progress.

First, however, I still have to figure out how to approach the original issue of the thread.

Here is a more concrete example of the problem:

If I am starting a pipeline by feeding Ruffus paired-end RNA-Seq reads (so R1 and R2 are each separate paths through the pipeline), how can I avoid calling a mapping function twice for a given sample?

It seems like one approach might be to @collate the mate-pairs, do the mapping, and then @split them back up again. Does that sound like a reasonable approach?


Leo Goodstadt 顧維斌

unread,
Apr 13, 2016, 3:20:35 PM4/13/16
to ruffus_...@googlegroups.com
I think this is probably the most sensible idea. The alternative is to only use R1 files and add matching R2 files using add_files or even at the onset of the pipeline and then always process R1 and R2 as a pair.
I have done both myself. Depends on how much parallelism you want to extract and how often you want to use (other parts of) the same pipeline to deal with single ended files.
Leo

Keith Hughitt

unread,
Apr 13, 2016, 4:04:50 PM4/13/16
to ruffus_discuss, llewgo...@gmail.com
I just finished refactoring the code to use @collate and @subdivide to combine and break-apart the mate-pairs and needed and so far the pipeline seems to be running smoothly!

It took a bit of time to rework everything, but I think it is much cleaner and easier to work with now.

Thanks for all of the help!

Cheers,
Keith

Leo Goodstadt 顧維斌

unread,
Apr 13, 2016, 6:42:34 PM4/13/16
to ruffus_...@googlegroups.com
Glad to hear it.
Leo

--
Reply all
Reply to author
Forward
0 new messages