#!/usr/bin/env nextflowoutput_path = new File('').getAbsolutePath() + '/data'filenames = Channel.from( "alpha", "beta" )process parallel_process {echo truestoreDir output_pathinput:val(filename) from filenamesoutput:file("${filename}.txt") into parallel_channel"""#!/usr/bin/env pythonimport timeimport randomwait = random.randint(1, 5)time.sleep(wait)with open('${filename}.txt', 'w') as f:f.write("${filename} waited {} seconds.".format(wait))"""}parallel_channel.collectFile(name: file("dummy.txt")).set {join_channel}process join_process {echo truestoreDir output_pathinput:file dummy from join_channeloutput:file("DONE.txt") into out_channel"""touch DONE.txt"""}
#!/usr/bin/env nextflowoutput_path = new File('').getAbsolutePath() + '/data'filenames = Channel.from( "alpha", "beta" )process parallel_process {echo truestoreDir output_pathinput:val(filename) from filenamesoutput:
file("${filename}.txt") into stored_processfile("${filename}.FLAG") into parallel_channelshell:
"""#!/usr/bin/env pythonimport timeimport randomwait = random.randint(1, 5)time.sleep(wait)with open('${filename}.txt', 'w') as f:f.write("${filename} waited {} seconds.".format(wait))
with open('${filename}.FLAG', 'w') as f:f.write("DONE")
HelloCase study: I want all parallel_process to finish before the join_process is allowed to start.Each parallel process needs to write on disk in a storeDir area, takes a while and writes a lot of data (bioinformatics: read alignments).
NB: if I leave .collectFile() without argument, then the process join_process runs twice. Is that behaviour expected?
process align {input:val x from (['one','two','three'])output:file '*.bam' into alignment"""echo ${x} > ${x}.bam"""}process combine {input:file bams from alignment.toList()output:file 'result.txt' into result"""cat $bams > result.txt"""}result.println { it.text }
--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
Visit this group at http://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.
The output of the script below is:
N E X T F L O W ~ version 0.15.6
Launching trio.nf
[warm up] executor > local
[48/bf859b] Submitted process > align (1)
[7c/6cfd49] Submitted process > align (3)
[d6/703f22] Submitted process > align (2)
[7e/45aa5e] Submitted process > combine (1)
vc -P proband.fastq.bam -M mother.fastq.bam -F father.fastq.bamHowever if I add a "storeDir" the last line becomes:
vc -P input.2 -M input.3 -F input.1That doesn't work. I quite don't understand why.
Here is the script:
#!/usr/bin/env nextflow
output_path = new File('').getAbsolutePath() + '/data'
input = Channel.from(['father', file('father.fastq')], ['mother', file('mother.fastq')], ['proband', file('proband.fastq')])
process align {
storeDir output_path
scratch true
input:
set val(x), file(fastq) from input
output:
file('*.bam') into alignment
val(x) into alignment_type
file '*.bai' into bai_alignment
script:
"""
echo ${x} > ${fastq}.bam
echo ${x} > ${fastq}.bai
"""
}
process combine {
storeDir output_path
scratch true
input:
file(bamfile) from alignment.toList()
val(sampletype) from alignment_type.toList()
file(baifile) from bai_alignment.toList()
output:
file 'result.txt' into result
script:
def map = [:]
map["${sampletype[0]}"] = [bam : "${bamfile[0]}", bai : "${baifile[0]}"]
map["${sampletype[1]}"] = [bam : "${bamfile[1]}", bai : "${baifile[1]}"]
map["${sampletype[2]}"] = [bam : "${bamfile[2]}", bai : "${baifile[2]}"]
"""
echo "vc -P $map.proband.bam -M $map.mother.bam -F $map.father.bam" > result.txt
"""
}
result.subscribe { println it.text }process align {2) specifying the punctual output file names instead of using a star pattern. For example:storeDir "$output_path/align/$x":}
output:val(x) into alignment_typefile("${fastq}.bam") into alignmentfile("${fastq}.bai") into bai_alignment
👍
--