Branches with overlapping varibale.

26 views
Skip to first unread message

ben curran

unread,
May 13, 2019, 11:04:54 PM5/13/19
to bpipe-discuss
If I have a list of specific branches I want to define and some of them contain the same variable/use the same data file, how do I stop bpipe from executing the same stage on the same data file multiple times.

i.e. I have three samples:
A0028_1 (non tumor)
A0028_2 (tumor sample)
A0028_3 (tumor sample)

And I want to trim and align all three, and then compare A0028_1 with A0028_2 and A0028_1 with A0028_3.

I have stages

trim = {
    exec """cutadapt --minimum-length 50 -a AGATCGGAAGAGC -q 30  -o $output1 -p $output2 $input1.fastq.gz $input2.fastq.gz""","trim"
}

align = {
    SAMPLE=$branch
    exec """bwa mem -t $threads -R '@RG\\tID:$SAMPLE\\tSM:$SAMPLE\\tLB:$SAMPLE\\tPL:ILLUMINA' $bwaIndex $input1 $input2 | samtools sort -@12 -O BAM -o $output.bam""", "align"
}

pileUp = {
    exec "samtools mpileup -P $threads -B -d 9001 -q 1 -f $bwaIndex  $input2.bam $input1.bam | java -Xmx16g -d64 -jar /nesi/project/uoa00571/bin/VarScan.v2.4.3.jar somatic --mpileup 1 --min-var-freq 0.1 --p-value 1.00 --somatic-p-value 1.00 --strand-filter 0 --tumor-purity 0.5 --output-vcf 1 --min-coverage-normal 10 --min-coverage-tumor 10 --output-snp $output1.snp.vcf  --output-indel $output1.indel.vcf","pileUp"

}

and branches:

def branches = [
    sample1 : ["A0028_1_1.fastq.gz", "A0028_1_2.fastq.gz", "A0028_2_1.fastq.gz", "A0028_2_2.fastq.gz"],
    sample2 : ["A0028_1_1.fastq.gz", "A0028_1_2.fastq.gz", "A0028_3_1.fastq.gz", "A0028_3_2.fastq.gz"]
]

and a run command like:

run {
    branches * [trim]  + branches * [align] + branches * [pileUp]
}


The trouble with this though, is that as far as I can see the trimming and alignment steps for A0028_1 would be executed twice, likely at the same time. My first thought was that this would be fine as which ever process finished last would overwrite the earlier versions. Apart from being inefficient though, bwa creates multiple temp files and whislt I'm assuming those temp files would be the same, run to run, I can't guarantee that. Which could lead to problems if two branches were trying to reassemble their bam files at the same time. I thought of using a listener (bpipe.EventManager.getInstance().addListener(STAGE_STARTED)), but that only sends a message when the current branch/stage runs, it doesn't check for other branches running the same data.


I could also use pattern matches for the input, i.e.
run {
    "%_*.fastq.gz" * [trim]  + "%_*.trim" * [align] + branches * [pileUp]
}

This however, would require me to know the full the filenames that would be created before creating the branches - which will get messy I think, other stages will be introduced between the align and the pileUp stages, i.e.

def branches = [
    sample1 : ["A0028_1_1.fastq.gz.trim.align.[otherstages]", "A0028_1_2.fastq.gz.trim.align.[otherstages]", "A0028_2_1.fastq.gz.trim.align.[otherstages]", "A0028_2_2.fastq.gz.trim.align.[otherstages]"],
    sample2 : ["A0028_1_1.fastq.gz.trim.align.[otherstages]", "A0028_1_2.fastq.gz.trim.align.[otherstages]", "A0028_3_1.fastq.gz.trim.align.[otherstages]", "A0028_3_2.fastq.gz.trim.align.[otherstages]"]
]


Is there a way for a stage to check if other stages are already executing the same command and to skip executing if it is? Or a way to define the branches at the beginning (with just the base file names), run using a pattern match on the first two stages and then have the branches pick up the files with the extended filenames when it gets to the pileup step?



Cheers
Ben.













    bpipe.EventManager.getInstance().addListener(STAGE_STARTED) {
        type, desc, details -> println "Event occurred! Here are the details:\n " + details.collect { key, value -> "$key : $value\n" }.join('')
    }

    check {
       exec "[ -s $output ]"
    } otherwise {
       succeed "Another branch has already started aligning this file"
    }





Simon Sadedin

unread,
May 19, 2019, 7:47:14 PM5/19/19
to bpipe-discuss
Hi Ben,

Sorry for taking a few days to reply. I think that once you have a complex structure of relationships between the inputs / outputs that you are processing in a pipeline, it often makes sense to model that explicitly.

I reworked your script into an example below (replacing all the tools with dummy standins). The key to it is that you have a data structure that defines the different sample types (which I called 'control' and 'test' here). Then you can use that to name the outputs, and downstream refer explicitly to the outputs you want ($input.control.bam, etc), or be generic when you don't care ($input.fastq.gz). This probably doesn't do exactly what you want, but hopefully will give you an idea about a different way to approach it.

Cheers!

Simon

trim = {
    branch.sample=branch.name
    println "Aligning: $sample"
    transform('trim.gz','trim.gz') {
        exec """
            cat > $output1 < $input1.fastq.gz ; cat > $output2 < $input2.fastq.gz
        """
    }
}

align = {
    requires type: 'the kind of sample being processed: test or control'
    produce(sample + '.' + type + '.bam') {
        exec """
            cat $input1 $input2 > $output.bam
        """
    }
}

pileUp = {
    branch.sample = branch.name
    from(sample + '*.bam') {
        exec """
            cat $input.control.bam $input.test.bam | tee $output1.snp.vcf > $output1.indel.vcf
        """
    }
}

controls = [
    control1 : ["A0028_1_1.fastq.gz", "A0028_1_2.fastq.gz"]
]

samples = [
    sample1 : ["A0028_2_1.fastq.gz", "A0028_2_2.fastq.gz"],
    sample2 : ["A0028_3_1.fastq.gz", "A0028_3_2.fastq.gz"]
]

run {
    [
        controls * [ trim + align.using(type:'control') ],  
        samples *  [ trim + align.using(type:'test') ]
    ] + samples * [
        pileUp
    ]
}





--
You received this message because you are subscribed to the Google Groups "bpipe-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bpipe-discus...@googlegroups.com.
To post to this group, send email to bpipe-...@googlegroups.com.
Visit this group at https://groups.google.com/group/bpipe-discuss.
To view this discussion on the web visit https://groups.google.com/d/msgid/bpipe-discuss/117217cc-fdfe-4828-b337-6c33a6fb290f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

ben curran

unread,
May 20, 2019, 5:31:58 PM5/20/19
to bpipe-discuss
Interesting. When you're referring to $input.control.bam,in the pileup step, does it just pick it up from the previous step, even though you haven't specified it as an input to the current step? My understanding was that if you were specifying branches, that they were the only input to that step?
I suspect I'll have to think about how to modify this for when there's multiple control samples, though I suspect generating a separate pipeline file for each control sample might be the way to go <hrrmms>.

This also gets around another problem I've been having which is if I'm only using the predefined branches at a step mid-way through the pipeline, I was having to predict the full name of the file (i.e. in the controls and the samples definitions, I'd have to specify A0028_1_1.fastq.gz.trim.align.bam). Getting rid of that will be nice.

Anyway. I'll have a go at something like this this afternoon.

Many thanks.
Ben.
To unsubscribe from this group and stop receiving emails from it, send an email to bpipe-...@googlegroups.com.

ben curran

unread,
May 21, 2019, 6:23:35 PM5/21/19
to bpipe-discuss
If I produce a separate file for each patient, this works beautifully thank you.

Is there a limit to the nesting in the run command though? I thought I'd try and be clever and add in running fastqc before and after trimming as well, so I changed
run { [  control * [trim + align.using(type:'control') + removeDuplicates + removeSuplementary ],   samples *  [trim + align.using(type:'test') + removeDuplicates + removeSuplementary ]] + samples * [  pileUp  ] }

to

run { [  control * [[qc,trim] + [qc,align.using(type:'control')] + removeDuplicates + removeSuplementary ],   samples *  [[qc,trim] + [qc,align.using(type:'test')] + removeDuplicates + removeSuplementary ]] + samples * [  pileUp  ] }

which still runs, but for reasons I am unable to understand, it changes the names of the samples from control1 to 1, sample1 to 1, sample2 to 2 and so on.

Ben.

ben curran

unread,
Jun 3, 2019, 8:10:43 PM6/3/19
to bpipe-discuss
So I've figured out why this:

run { [  control * [[qc,trim] + [qc,align.using(type:'control')] + removeDuplicates + removeSuplementary ],   samples *  [[qc,trim] + [qc,align.using(type:'test')] + removeDuplicates + removeSuplementary ]] + samples * [  pileUp  ] }

doesn't work. It processes the control and then all the test samples, processes the removeDuplicates and removeSuplementary stages, but then when I combine them, I'm referring to the samples branch definitions again, which means it goes back to look for inputs that conform to the branch definitions. This means that the outputs from the previous steps, the removeDuplicates and removeSuplementary stages, is ignored. Basically, as soon as I add in those two stages I ruin the ability of bpipe to join things back up properly in the pileUp stage. I could change the pattern in the pileUp stage from:

from(sample + '*.bam') {
       
exec """samtools
 mpileup -P $threads -B -d 9001 -q 1 -f $bwaIndex  $input.control.bam
$input.test.bam | java -Xmx16g -d64 -jar
/nesi/project/uoa00571/bin/VarScan.v2.4.3.jar somatic --mpileup 1
--min-var-freq 0.1 --p-value 1.00 --somatic-p-value 1.00 --strand-filter
 0 --tumor-purity 0.5 --output-vcf 1 --min-coverage-normal 10
--min-coverage-tumor 10 --output-snp $output.snp.vcf  --output-indel
$output.indel.vcf"""
,"pileUp"

   
}


to:
from(sample + '*.bam') {
        exec """samtools mpileup -P $threads -B -d 9001 -q 1 -f $bwaIndex  $sample.control.removeDuplicates.removeSuplemantary.bam $input.test.removeDuplicates.removeSuplemantary.bam | java -Xmx16g -d64 -jar /nesi/project/uoa00571/bin/VarScan.v2.4.3.jar somatic --mpileup 1 --min-var-freq 0.1 --p-value 1.00 --somatic-p-value 1.00 --strand-filter 0 --tumor-purity 0.5 --output-vcf 1 --min-coverage-normal 10 --min-coverage-tumor 10 --output-snp $output.snp.vcf  --output-indel $output.indel.vcf""","pileUp"

   
}



but that defeats the purpose of having a pipeline that I can easily change stages with. It also doesn't work, giving me java "no such property" errors. I can remove the "samples *" from the pileUp stage in the run command, but that then gives me output with filenames relating to the control sample, which will lead to all sorts of downstream problems.

Is what I'm trying to do, possible? i.e. 
run { [  control * [[qc,trim] + [qc,align.using(type:'control')] + removeDuplicates + removeSuplementary ],   samples *  [[qc,trim] + [qc,align.using(type:'test')] + removeDuplicates + removeSuplementary ]] + ???? * [  pileUp  ] }


Cheers
Ben. 
Reply all
Reply to author
Forward
0 new messages