How to: many parallel processed to finish before a join process is allowed to start?

845 views
Skip to first unread message

Hugues Fontenelle

unread,
Oct 14, 2015, 9:49:33 AM10/14/15
to Nextflow
Hello

Case study: I want all parallel_process to finish before the join_process is allowed to start.
Each parallel process needs to write on disk in a storeDir area, takes a while and writes a lot of data (bioinformatics: read alignments).
The join_process then takes the results for further processing.

Here is what I came up with:

#!/usr/bin/env nextflow

output_path = new File('').getAbsolutePath() + '/data'

filenames = Channel
    .from( "alpha", "beta" )

process parallel_process {
    echo true
    storeDir output_path

    input:
    val(filename) from filenames

    output:
    file("${filename}.txt") into parallel_channel

"""
#!/usr/bin/env python
import time
import random
wait = random.randint(1, 5)
time.sleep(wait)
with open('${filename}.txt', 'w') as f:
    f.write("${filename} waited {} seconds.".format(wait))
"""
}

parallel_channel
  .collectFile(name: file("dummy.txt"))
  .set {
     join_channel
   }

process join_process {
   echo true
   storeDir output_path

   input:
   file dummy from join_channel

   output:
   file("DONE.txt") into out_channel

   """
   touch DONE.txt
   """
}


It works with the dummy file trick, but obviously I'd rather avoid concatenating multi-gigabytes files (bioinformatics: BAM files).
Any suggestion?

NB: if I leave .collectFile() without argument, then the process join_process runs twice. Is that behaviour expected?

Thank you <3


Hugues Fontenelle

unread,
Oct 14, 2015, 9:56:47 AM10/14/15
to Nextflow
Just posting the question and I came up with a better solution.
I can also write a semaphore and collect that one in.
What do you think?

#!/usr/bin/env nextflow

output_path = new File('').getAbsolutePath() + '/data'

filenames = Channel
    .from( "alpha", "beta" )

process parallel_process {
    echo true
    storeDir output_path

    input:
    val(filename) from filenames

    output:
    file("${filename}.txt") into stored_process
    file("${filename}.FLAG") into parallel_channel

    shell:
"""
#!/usr/bin/env python
import time
import random
wait = random.randint(1, 5)
time.sleep(wait)
with open('${filename}.txt', 'w') as f:
    f.write("${filename} waited {} seconds.".format(wait))
with open('${filename}.FLAG', 'w') as f:
    f.write("DONE")

Paolo Di Tommaso

unread,
Oct 14, 2015, 10:23:38 AM10/14/15
to nextflow
Hi, 

On Wed, Oct 14, 2015 at 3:49 PM, Hugues Fontenelle <hugues.f...@gmail.com> wrote:
Hello

Case study: I want all parallel_process to finish before the join_process is allowed to start.
Each parallel process needs to write on disk in a storeDir area, takes a while and writes a lot of data (bioinformatics: read alignments).


OK, just as a side note take in consideration that even if you do not use "storeDir" files are not copied from one task dir to another, but a symlink is just created. 
I wondering if you do really need the "join_process" process.  As far as I've understood you want to merge the files (reads) produced by the first process into a single file (?). If so that work is done the collectFile operator, you won't need the join step. 



NB: if I leave .collectFile() without argument, then the process join_process runs twice. Is that behaviour expected?

Yes. Even though it's a bit tricky, collectFile is meant to *group* and merge files having the same name. If you want to create a single file, you will need to specify that with the name parameter. 


Does it make sense? 


Cheers,
Paolo
 

Paolo Di Tommaso

unread,
Oct 14, 2015, 10:30:19 AM10/14/15
to nextflow
I missed your question in the message subject. 

If for the join process you mean the collectFile operator, that is the one merging the files, it starts as soon there's a process output.

If you mean when the "parallel_process" is execute, after the last "parallel_process" is executed. 


p

Hugues Fontenelle

unread,
Oct 14, 2015, 2:27:49 PM10/14/15
to Nextflow
Hi Paolo

Thanks for the answers.

> As far as I've understood you want to merge the files (reads) produced by the first process into a single file (?)

No, actually, I don't. It was just a trick to achieve the behaviour that I wanted.
But forgive me, I most likely messed up my explanations in first place :p
So let me develop on the (biological) rationale:

I want to do joint variant calling for trios (mother, father, and affected child). I'll run three alignments in parallel, each taking two reads as inputs, and each producing one BAM file as output.
After all three "processes" are done, I'll do a single join process (the variant calling), taking all three BAM files at once as input, and producing one VCF file as output.

Now this is exciting, isn't it? :-)

The "semaphore" aka "flag" solution that I wrote in my second post works fine for me, so there's not hurry. But I highly value any input!

Hugues


Paolo Di Tommaso

unread,
Oct 14, 2015, 3:13:12 PM10/14/15
to nextflow
OK, if so the trick is to use the toList operator will collect all the items and emits them as a single list object (of BAM files in your case). 


For example:


process align {
  input: 
  val x from (['one','two','three'])

  output: 
  file '*.bam' into alignment 
  
  """
  echo ${x} > ${x}.bam 
  """
}


process combine {
  input: 
  file bams from alignment.toList()
  
  output:
  file 'result.txt' into result
  
  """
  cat $bams > result.txt 
  """
}

result.println { it.text }


Does that work for you  ?


Cheers,
Paolo


--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
Visit this group at http://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.

Hugues Fontenelle

unread,
Oct 16, 2015, 6:13:53 AM10/16/15
to Nextflow
Thank you Paolo, that script worked for me, yes :-)

I built a little upon it, and I am now running into trouble again.

The output of the script below is:
N E X T F L O W  
~  version 0.15.6
Launching trio.nf
[warm up] executor > local
[48/bf859b] Submitted process > align (1)
[7c/6cfd49] Submitted process > align (3)
[d6/703f22] Submitted process > align (2)
[7e/45aa5e] Submitted process > combine (1)
vc
-P proband.fastq.bam -M mother.fastq.bam -F father.fastq.bam


However if I add a "storeDir" the last line becomes:

vc -P input.2 -M input.3 -F input.1

That doesn't work. I quite don't understand why.


Here is the script:

#!/usr/bin/env nextflow
output_path
= new File('').getAbsolutePath() + '/data'

input
= Channel.from(['father', file('father.fastq')], ['mother', file('mother.fastq')], ['proband', file('proband.fastq')])

process align
{
    storeDir output_path
    scratch
true

    input
:
   
set val(x), file(fastq) from input

    output
:
    file
('*.bam') into alignment
    val
(x) into alignment_type
    file
'*.bai' into bai_alignment

    script
:
   
"""
    echo ${x} > ${fastq}.bam
    echo ${x} > ${fastq}.bai
    """

}

process combine
{
    storeDir output_path
    scratch
true

    input
:
    file
(bamfile) from alignment.toList()
    val
(sampletype) from alignment_type.toList()
    file
(baifile) from bai_alignment.toList()

    output
:
    file
'result.txt' into result

    script
:
   
def map = [:]
    map
["${sampletype[0]}"] = [bam : "${bamfile[0]}", bai : "${baifile[0]}"]
    map
["${sampletype[1]}"] = [bam : "${bamfile[1]}", bai : "${baifile[1]}"]
    map
["${sampletype[2]}"] = [bam : "${bamfile[2]}", bai : "${baifile[2]}"]


   
"""
    echo "
vc -P $map.proband.bam -M $map.mother.bam -F $map.father.bam" > result.txt
    """

}

result
.subscribe { println it.text }


Paolo Di Tommaso

unread,
Oct 16, 2015, 7:47:46 AM10/16/15
to nextflow
Hi, 

The problem is that when you declare a "storeDir" the process will try to collect the expected output files in that folder. Thus it will match all the *.bam and *.bai files that may have been produced by different process instances. 

The usage of "storeDir" is a bit tricky and for this reason I suggest it's usage only for very limited cases.
 

However you can fix your script in two way: 

1) specifying a different storeDir path for each alignment process instance, changing it to: 

process align {
    storeDir "$output_path/align/$x"
    : 
}

2) specifying the punctual output file names instead of using a star pattern. For example: 

    output:
    val(x) into alignment_type
    file("${fastq}.bam") into alignment
    file("${fastq}.bai") into bai_alignment



Hope this helps. 

Cheers,
Paolo

 

Hugues Fontenelle

unread,
Oct 20, 2015, 6:40:15 AM10/20/15
to Nextflow
Thanks a lot.
Works for me. I may go with both "fixes" for clarity and robustness.
Cheers
Hugues

Paolo Di Tommaso

unread,
Oct 20, 2015, 6:41:43 AM10/20/15
to nextflow

👍


--
Reply all
Reply to author
Forward
0 new messages