wait for process to finish and then create a channel with its output dir

2,418 views
Skip to first unread message

daniel...@gladstone.ucsf.edu

unread,
Mar 1, 2018, 3:27:20 PM3/1/18
to Nextflow
Hi,

I'm writing to ask if the following syntax is okay for waiting for a process to finish and then creating a channel with its output dir:

```
midas_species_dir_channel = Channel.create()

midas_species_output_4_merge.subscribe onComplete: {
    midas_species_dir_channel = Channel
            .fromPath(midas_species_dir, type: 'dir')
            .ifEmpty { exit 1, "MIDAS species output was not found in: ${midas_species_dir}" }
}
```

The prior process' output is the `midas_species_output_4_merge` channel, and the next process' input is `midas_species_dir_channel`

Thanks for your help!
Dan.

Paolo Di Tommaso

unread,
Mar 2, 2018, 4:34:06 AM3/2/18
to nextflow
No, you cannot do that. What you are trying to do is the default behaviour a process output channel, therefore I'm not understanding the reason of this piece of code. 



--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.

daniel...@gladstone.ucsf.edu

unread,
Mar 2, 2018, 9:29:03 AM3/2/18
to Nextflow
There are several processes prior to this piece of code that work on several/many pairs of files (paired-end fastq's) or individual sample files (e.g. BAMs). However, at this point in the pipeline I need to wait for all the individual sample files to have run through the prior process before the next process can work on the whole output directory of the prior process.

Restated, I'm trying to wait for the prior process to finish running all the samples before the next process performs an operation on all the samples together. And, the tool used by next process takes in the directory that holds all the samples' output. So, I thought it would work to create a channel that emits the directory of results/output (instead of emitting the individual samples' output) only after all the samples have been processed by the prior process.

This is probably a common use case, but I didn't find documentation that to would show me how to implement it in NF.

Thanks for your help!!  (NF is awesome, BTW)


On Friday, March 2, 2018 at 1:34:06 AM UTC-8, Paolo Di Tommaso wrote:
No, you cannot do that. What you are trying to do is the default behaviour a process output channel, therefore I'm not understanding the reason of this piece of code. 


On Thu, Mar 1, 2018 at 9:27 PM, <daniel...@gladstone.ucsf.edu> wrote:
Hi,

I'm writing to ask if the following syntax is okay for waiting for a process to finish and then creating a channel with its output dir:

```
midas_species_dir_channel = Channel.create()

midas_species_output_4_merge.subscribe onComplete: {
    midas_species_dir_channel = Channel
            .fromPath(midas_species_dir, type: 'dir')
            .ifEmpty { exit 1, "MIDAS species output was not found in: ${midas_species_dir}" }
}
```

The prior process' output is the `midas_species_output_4_merge` channel, and the next process' input is `midas_species_dir_channel`

Thanks for your help!
Dan.

--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.

daniel...@gladstone.ucsf.edu

unread,
Mar 2, 2018, 6:19:58 PM3/2/18
to Nextflow
So, is this the correct thing to do in the downstream process then?

```
input:
    file all_midas_species_results from midas_species_output_4_merge.collect()
```

Paolo Di Tommaso

unread,
Mar 5, 2018, 1:47:42 PM3/5/18
to nextflow
So, basically you are describing a scatter-gather patter, in which you have a step processing the outputs of a parallel execution of a the upstream process. Yes, the collect operator is doing that. 

You can see an example here.  


Hope it helps. 

p

To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.

daniel...@gladstone.ucsf.edu

unread,
Mar 5, 2018, 2:13:36 PM3/5/18
to Nextflow
Thanks a ton for your reply, Paolo!

Yes, that's essentially correct.

However, my use case differs from the example in that the tool used in the downstream process expects the directory holding all the output files (from the upstream process) as its input and not a list of files.

What is the NF idiomatic way to wait for parallel execution to finish and then emit the output directory?  That is, the directory holding all the samples' output, and not the publishDir of 1 single sample.

Should I use a closure to transform the list of files into a single parent directory emission?

On a related note, what's the difference between .collect() and .toList()?  They seem the same to me.

Paolo Di Tommaso

unread,
Mar 5, 2018, 2:16:40 PM3/5/18
to nextflow
NF does not care about files or directory. It only capture the you declared in the output whatever is a file or a directory. 

p

To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.

daniel...@gladstone.ucsf.edu

unread,
Mar 5, 2018, 2:43:40 PM3/5/18
to Nextflow
Yes, I know that. However, I'm emitting files because the upstream process works on a per-sample basis, and the downstream process takes in a directory.

I don't want to emit the output directory (holding all the results from the per-sample parallel runs) in the upstream process because I don't want it to be emitted multiple times, nor do I want it to be emitted before all the per-sample runs are completed.

That's why I had initially used a `.subscribe onComplete: { }` construction to wait for all the per-sample runs to finish before creating a channel that emits the output directory holding all the results from the per-sample parallel runs.

I'm still unsure about A) valid syntax for doing so, and B) an elegant or NF-idiomatic way of doing so.

Paolo Di Tommaso

unread,
Mar 5, 2018, 3:10:39 PM3/5/18
to nextflow
Sorry, still not understanding. 

Provide a minimal example for your use case. 


p

To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.

daniel...@gladstone.ucsf.edu

unread,
Mar 6, 2018, 12:27:40 AM3/6/18
to Nextflow
What about using the When block in the downstream process to wait for all the files to be processed by the upstream process?

Paolo Di Tommaso

unread,
Mar 6, 2018, 2:03:56 AM3/6/18
to nextflow
Processes synchronisation is defined by channel input/output declarations and operator composition. 

Otherwise the problem is malformed or you are trying to use the tool in a way for which is not designed for. 


p

To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.

Steve

unread,
Mar 6, 2018, 10:15:46 AM3/6/18
to Nextflow
How about something like this?

params.samples_list = ['Sample1', 'Sample2', 'Sample3', 'Sample4']
Channel.from( params.samples_list ).into { samples_list; samples_list2 }

samples_list2
.subscribe {
    println
"[samples_list2] ${it}"
}

process make_txt
{
    tag
{ "${sample_ID}" }
    executor
"local"
    echo
true

    input
:
    val
(sample_ID) from samples_list

    output
:
    file
"${sample_ID}.txt" into samples_files, samples_files2

    script
:

   
"""
    echo "
[make_txt] ${sample_ID}"
    echo "
[make_txt] ${sample_ID}" > "${sample_ID}.txt"
    """

}

process make_dir
{
    echo
true
    stageInMode
"copy"

    input
:
    file
("*") from samples_files.collect()

    output
:
    file
("samples_dir") into samples_dir

    script
:
   
"""
    echo "
[make_dir]"
    pwd
    for item in *; do
        mkdir -p samples_dir
        mv "
\${item}" samples_dir/
    done
    tree
    """

}

process use_dir
{
    echo
true

    input
:
    file
(dir) from samples_dir

    script
:
   
"""
    echo "
[use_dir]"
    pwd
    tree "
${dir}/"
    """

}


output:

N E X T F L O W  
~  version 0.27.2
Launching `dir-process.nf` [fervent_shockley] - revision: c665ca0bff
[samples_list2] Sample1
[samples_list2] Sample2
[samples_list2] Sample3
[samples_list2] Sample4
[warm up] executor > local
[fa/c76e67] Submitted process > make_txt (Sample1)
[1c/c18c3c] Submitted process > make_txt (Sample4)
[85/5b4639] Submitted process > make_txt (Sample3)
[d7/01f809] Submitted process > make_txt (Sample2)
[make_txt] Sample4
[make_txt] Sample3
[make_txt] Sample2
[make_txt] Sample1
[24/2e0e2c] Submitted process > make_dir
[make_dir]
/Users/kellys04/projects/nextflow-samplesheet-demo/work/24/2e0e2ca4a9357fe480375588bd89bd
.
`-- samples_dir
    |-- Sample1.txt
    |-- Sample2.txt
    |-- Sample3.txt
    `
-- Sample4.txt

1 directory, 4 files
[5e/0d5c8f] Submitted process > use_dir
[use_dir]
/Users/kellys04/projects/nextflow-samplesheet-demo/work/5e/0d5c8f63b126ac452bd3973a1574f0
samples_dir
/
|-- Sample1.txt
|-- Sample2.txt
|-- Sample3.txt
`-- Sample4.txt

0 directories, 4 files



might not actually need the 'stageInMode "copy" ' in there, wasn't sure how the symlink handling would work moving things into a single dir

daniel...@gladstone.ucsf.edu

unread,
Mar 8, 2018, 7:57:21 PM3/8/18
to Nextflow
Thanks a ton for your suggestion Steve!  It seems like that should do it.

I'll report back either way.

daniel...@gladstone.ucsf.edu

unread,
Mar 8, 2018, 8:55:08 PM3/8/18
to Nextflow
Actually, couldn't I just use `stageInMode 'copy'`, and then use `$PWD` for the directory with the input files?  ...and skip the middle process that sets up the directory.

I'll try that...

daniel...@gladstone.ucsf.edu

unread,
Mar 8, 2018, 9:45:41 PM3/8/18
to Nextflow
Nope, that won't work because $PWD is not the process work directory, but rather the directory from which the pipeline is run.

daniel...@gladstone.ucsf.edu

unread,
Mar 9, 2018, 1:37:02 AM3/9/18
to Nextflow

The middle process isn’t needed, and preparing the samples_dir with mv can be done without the loop. See below:

params.samples_list = ['Sample1', 'Sample2', 'Sample3', 'Sample4']
Channel.from( params.samples_list ).into { samples_list; samples_list2 }

samples_list2.subscribe {
    println "[samples_list2] ${it}"
}

process make_txt {
    tag { "${sample_ID}" }
    executor "local"
    echo true

    input:
    val(sample_ID) from samples_list

    output:
    file "${sample_ID}.txt" into samples_files

    script:

    """
    echo "[make_txt] ${sample_ID}"
    echo "[make_txt] ${sample_ID}" > "${sample_ID}.txt"
    """
}

process make_use_dir {
    echo true

    input:
    file("results_list") from samples_files.collect()

    output:
    file("samples_dir") into samples_dir

    script:
    """
    echo "[make_dir]"
    pwd
    mkdir -p samples_dir
    mv ${results_list} samples_dir/
    tree
    echo "[use_dir]"
    ls samples_dir
    """
}
Reply all
Reply to author
Forward
0 new messages