If I have a list of specific branches I want to define and some of them contain the same variable/use the same data file, how do I stop bpipe from executing the same stage on the same data file multiple times.
i.e. I have three samples:
A0028_1 (non tumor)
A0028_2 (tumor sample)
A0028_3 (tumor sample)
And I want to trim and align all three, and then compare A0028_1 with A0028_2 and A0028_1 with A0028_3.
I have stages
trim = {
exec """cutadapt --minimum-length 50 -a AGATCGGAAGAGC -q 30 -o $output1 -p $output2 $input1.fastq.gz $input2.fastq.gz""","trim"
}
align = {
SAMPLE=$branch
exec """bwa mem -t $threads -R '@RG\\tID:$SAMPLE\\tSM:$SAMPLE\\tLB:$SAMPLE\\tPL:ILLUMINA' $bwaIndex $input1 $input2 | samtools sort -@12 -O BAM -o $output.bam""", "align"
}
pileUp = {
exec "samtools mpileup -P $threads -B -d 9001 -q 1 -f $bwaIndex $input2.bam $input1.bam | java -Xmx16g -d64 -jar /nesi/project/uoa00571/bin/VarScan.v2.4.3.jar somatic --mpileup 1 --min-var-freq 0.1 --p-value 1.00 --somatic-p-value 1.00 --strand-filter 0 --tumor-purity 0.5 --output-vcf 1 --min-coverage-normal 10 --min-coverage-tumor 10 --output-snp $output1.snp.vcf --output-indel $output1.indel.vcf","pileUp"
}
and branches:
def branches = [
sample1 : ["A0028_1_1.fastq.gz", "A0028_1_2.fastq.gz", "A0028_2_1.fastq.gz", "A0028_2_2.fastq.gz"],
sample2 : ["A0028_1_1.fastq.gz", "A0028_1_2.fastq.gz", "A0028_3_1.fastq.gz", "A0028_3_2.fastq.gz"]
]
and a run command like:
run {
branches * [trim] + branches * [align] + branches * [pileUp]
}
The trouble with this though, is that as far as I can see the trimming and alignment steps for A0028_1 would be executed twice, likely at the same time. My first thought was that this would be fine as which ever process finished last would overwrite the earlier versions. Apart from being inefficient though, bwa creates multiple temp files and whislt I'm assuming those temp files would be the same, run to run, I can't guarantee that. Which could lead to problems if two branches were trying to reassemble their bam files at the same time. I thought of using a listener (bpipe.EventManager.getInstance().addListener(STAGE_STARTED)), but that only sends a message when the current branch/stage runs, it doesn't check for other branches running the same data.
I could also use pattern matches for the input, i.e.
run {
"%_*.fastq.gz" * [trim] + "%_*.trim" * [align] + branches * [pileUp]
}
This however, would require me to know the full the filenames that would be created before creating the branches - which will get messy I think, other stages will be introduced between the align and the pileUp stages, i.e.
def branches = [
sample1 : ["A0028_1_1.fastq.gz.trim.align.[otherstages]", "A0028_1_2.fastq.gz.trim.align.[otherstages]", "A0028_2_1.fastq.gz.trim.align.[otherstages]", "A0028_2_2.fastq.gz.trim.align.[otherstages]"],
sample2 : ["A0028_1_1.fastq.gz.trim.align.[otherstages]", "A0028_1_2.fastq.gz.trim.align.[otherstages]", "A0028_3_1.fastq.gz.trim.align.[otherstages]", "A0028_3_2.fastq.gz.trim.align.[otherstages]"]
]
Is there a way for a stage to check if other stages are already executing the same command and to skip executing if it is? Or a way to define the branches at the beginning (with just the base file names), run using a pattern match on the first two stages and then have the branches pick up the files with the extended filenames when it gets to the pileup step?
Cheers
Ben.
bpipe.EventManager.getInstance().addListener(STAGE_STARTED) {
type, desc, details -> println "Event occurred! Here are the
details:\n " + details.collect { key, value -> "$key : $value\n"
}.join('')
}
check {
exec "[ -s $output ]"
} otherwise {
succeed "Another branch has already started aligning this file"
}